Skip to content
Snippets Groups Projects
Commit a918c2bf authored by Xin.Zh's avatar Xin.Zh Committed by GitHub
Browse files

Merge pull request #505 from hxmhlt/metadata_report

Add: DelegateMetadataReport & RemoteMetadataService
parents 6352cd07 9c1d8d65
No related branches found
No related tags found
No related merge requests found
Showing
with 976 additions and 68 deletions
......@@ -74,3 +74,7 @@ const (
const (
COMMA_SPLIT_PATTERN = "\\s*[,]+\\s*"
)
const (
SIMPLE_METADATA_SERVICE_NAME = "MetadataService"
)
......@@ -79,6 +79,10 @@ const (
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
PROVIDER_SHUTDOWN_FILTER = "pshutdown"
CONSUMER_SHUTDOWN_FILTER = "cshutdown"
SYNC_REPORT_KEY = "sync.report"
RETRY_PERIOD_KEY = "retry.period"
RETRY_TIMES_KEY = "retry.times"
CYCLE_REPORT_KEY = "cycle.report"
)
const (
......
......@@ -28,14 +28,31 @@ import (
)
var (
instance report.MetadataReport
once sync.Once
instance report.MetadataReport
reportUrl common.URL
once sync.Once
)
// GetMetadataReportInstance ...
func GetMetadataReportInstance(url *common.URL) report.MetadataReport {
// GetMetadataReportInstance will return the instance in lazy mode. Be careful the instance create will only
// execute once.
func GetMetadataReportInstance(selectiveUrl ...*common.URL) report.MetadataReport {
once.Do(func() {
instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url)
var url *common.URL
if len(selectiveUrl) > 0 {
url = selectiveUrl[0]
instance = extension.GetMetadataReportFactory(url.Protocol).CreateMetadataReport(url)
reportUrl = *url
}
})
return instance
}
// GetMetadataReportUrl will return the report instance url
func GetMetadataReportUrl() common.URL {
return reportUrl
}
// SetMetadataReportUrl will only can be used by unit test to mock url
func SetMetadataReportUrl(url common.URL) {
reportUrl = url
}
......@@ -14,6 +14,7 @@ require (
github.com/dubbogo/go-zookeeper v1.0.0
github.com/dubbogo/gost v1.9.0
github.com/emicklei/go-restful/v3 v3.0.0
github.com/go-co-op/gocron v0.1.1
github.com/go-resty/resty/v2 v2.1.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1
......
......@@ -134,6 +134,8 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/ghodss/yaml v0.0.0-20150909031657-73d445a93680/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-co-op/gocron v0.1.1 h1:OfDmkqkCguFtFMsm6Eaayci3DADLa8pXvdmOlPU/JcU=
github.com/go-co-op/gocron v0.1.1/go.mod h1:Y9PWlYqDChf2Nbgg7kfS+ZsXHDTZbMZYPEQ0MILqH+M=
github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w=
github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q=
github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo=
......@@ -149,6 +151,7 @@ github.com/go-openapi/jsonpointer v0.0.0-20160704185906-46af16f9f7b1/go.mod h1:+
github.com/go-openapi/jsonreference v0.0.0-20160704190145-13c6e3589ad9/go.mod h1:W3Z9FmVs9qj+KR4zFKmDPGiLdk1D9Rlm7cyMvf57TTg=
github.com/go-openapi/spec v0.0.0-20160808142527-6aced65f8501/go.mod h1:J8+jY1nAiCcj+friV/PDoE1/3eeccG9LYBs0tYvLOWc=
github.com/go-openapi/swag v0.0.0-20160704191624-1d0bd113de87/go.mod h1:DXUve3Dpr1UfpPtxFw+EFuQ41HhCWZfha5jSVRG7C7I=
github.com/go-redis/redis v6.15.5+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE=
github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8=
github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck=
......@@ -394,10 +397,14 @@ github.com/oklog/run v0.0.0-20180308005104-6934b124db28/go.mod h1:dlhp/R75TPv97u
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1 h1:q/mM8GF/n0shIN8SaAZ0V+jnLPzen6WIVZdiwrRlMlo=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/image-spec v1.0.1 h1:JMemWkRwHx4Zj+fVxWoMCFm/8sYGGrUVojFA6h/TRcI=
......@@ -531,6 +538,8 @@ golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b h1:0mm1VjtFUOIlE1SbDlwjYaDxZVDP2S5ou6y0gSgXHu8=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
......@@ -539,6 +548,8 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20170830134202-bb24a47a89ea/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
......
......@@ -19,6 +19,9 @@ package definition
import (
"bytes"
"encoding/json"
"fmt"
"strings"
)
import (
......@@ -26,6 +29,11 @@ import (
"github.com/apache/dubbo-go/common/constant"
)
// ServiceDefiner is a interface of service's definition
type ServiceDefiner interface {
ToBytes() ([]byte, error)
}
// ServiceDefinition is the describer of service definition
type ServiceDefinition struct {
CanonicalName string
......@@ -34,6 +42,36 @@ type ServiceDefinition struct {
Types []TypeDefinition
}
func (def *ServiceDefinition) ToBytes() ([]byte, error) {
return json.Marshal(def)
}
func (def *ServiceDefinition) String() string {
var methodStr strings.Builder
for _, m := range def.Methods {
var paramType strings.Builder
for _, p := range m.ParameterTypes {
paramType.WriteString(fmt.Sprintf("{type:%v}", p))
}
var param strings.Builder
for _, d := range m.Parameters {
param.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName))
}
methodStr.WriteString(fmt.Sprintf("{name:%v,parameterTypes:[%v],returnType:%v,params:[%v] }", m.Name, paramType.String(), m.ReturnType, param.String()))
}
var types strings.Builder
for _, d := range def.Types {
types.WriteString(fmt.Sprintf("{id:%v,type:%v,builderName:%v}", d.Id, d.Type, d.TypeBuilderName))
}
return fmt.Sprintf("{canonicalName:%v, codeSource:%v, methods:[%v], types:[%v]}", def.CanonicalName, def.CodeSource, methodStr.String(), types.String())
}
// FullServiceDefinition is the describer of service definition with parameters
type FullServiceDefinition struct {
ServiceDefinition
Params map[string]string
}
// MethodDefinition is the describer of method definition
type MethodDefinition struct {
Name string
......@@ -53,8 +91,8 @@ type TypeDefinition struct {
}
// BuildServiceDefinition can build service definition which will be used to describe a service
func BuildServiceDefinition(service common.Service, url common.URL) ServiceDefinition {
sd := ServiceDefinition{}
func BuildServiceDefinition(service common.Service, url common.URL) *ServiceDefinition {
sd := &ServiceDefinition{}
sd.CanonicalName = url.Service()
for k, m := range service.Method() {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package definition
import (
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
func TestBuildServiceDefinition(t *testing.T) {
serviceName := "com.ikurento.user.UserProvider"
group := "group1"
version := "0.0.1"
protocol := "dubbo"
beanName := "UserProvider"
url, err := common.NewURL(fmt.Sprintf(
"%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, serviceName, group, version, beanName))
assert.NoError(t, err)
_, err = common.ServiceMap.Register(serviceName, protocol, &UserProvider{})
assert.NoError(t, err)
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
sd := BuildServiceDefinition(*service, url)
assert.Equal(t, "{canonicalName:com.ikurento.user.UserProvider, codeSource:, methods:[{name:GetUser,parameterTypes:[{type:slice}],returnType:ptr,params:[] }], types:[]}", sd.String())
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package definition
import (
"context"
"time"
)
type User struct {
Id string
Name string
Age int32
Time time.Time
}
type UserProvider struct {
}
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) {
rsp := User{"A001", "Alex Stocks", 18, time.Now()}
return &rsp, nil
}
func (u *UserProvider) Reference() string {
return "UserProvider"
}
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
......@@ -18,6 +18,7 @@
package identifier
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
)
......@@ -28,6 +29,18 @@ type ServiceMetadataIdentifier struct {
BaseMetadataIdentifier
}
func NewServiceMetadataIdentifier(url common.URL) *ServiceMetadataIdentifier {
return &ServiceMetadataIdentifier{
BaseMetadataIdentifier: BaseMetadataIdentifier{
ServiceInterface: url.Service(),
Version: url.GetParam(constant.VERSION_KEY, ""),
Group: url.GetParam(constant.GROUP_KEY, ""),
Side: url.GetParam(constant.SIDE_KEY, ""),
},
Protocol: url.Protocol,
}
}
// GetIdentifierKey will return string format as service:Version:Group:Side:Protocol:"revision"+Revision
func (mdi *ServiceMetadataIdentifier) GetIdentifierKey() string {
return mdi.BaseMetadataIdentifier.getIdentifierKey(mdi.Protocol, constant.KEY_REVISON_PREFIX+mdi.Revision)
......
......@@ -20,7 +20,7 @@ package identifier
// SubscriberMetadataIdentifier is inherit baseMetaIdentifier with service params: Revision
type SubscriberMetadataIdentifier struct {
Revision string
BaseMetadataIdentifier
MetadataIdentifier
}
// GetIdentifierKey will return string format as service:Version:Group:Side:Revision
......
......@@ -27,11 +27,13 @@ import (
var subscribeMetadataId = &SubscriberMetadataIdentifier{
Revision: "1.0",
BaseMetadataIdentifier: BaseMetadataIdentifier{
ServiceInterface: "org.apache.pkg.mockService",
Version: "1.0.0",
Group: "Group",
Side: "provider",
MetadataIdentifier: MetadataIdentifier{
BaseMetadataIdentifier: BaseMetadataIdentifier{
ServiceInterface: "org.apache.pkg.mockService",
Version: "1.0.0",
Group: "Group",
Side: "provider",
},
},
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package delegate
import (
"encoding/json"
"runtime/debug"
"sync"
"time"
)
import (
"github.com/go-co-op/gocron"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
)
const (
// defaultMetadataReportRetryTimes is defined for max times to retry
defaultMetadataReportRetryTimes int64 = 100
// defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second
defaultMetadataReportRetryPeriod int64 = 3
// defaultMetadataReportRetryPeriod is defined for cycle report or not
defaultMetadataReportCycleReport bool = true
)
// metadataReportRetry is a scheduler for retrying task
type metadataReportRetry struct {
retryPeriod int64
retryLimit int64
scheduler *gocron.Scheduler
job *gocron.Job
retryCounter *atomic.Int64
// if no failed report, wait how many times to run retry task.
retryTimesIfNonFail int64
}
// newMetadataReportRetry will create a scheduler for retry task
func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) {
s1 := gocron.NewScheduler(time.UTC)
mrr := &metadataReportRetry{
retryPeriod: retryPeriod,
retryLimit: retryLimit,
scheduler: s1,
retryCounter: atomic.NewInt64(0),
retryTimesIfNonFail: 600,
}
newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do(
func() {
mrr.retryCounter.Inc()
logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load())
if mrr.retryCounter.Load() > mrr.retryLimit {
mrr.scheduler.Clear()
} else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail {
mrr.scheduler.Clear() // may not interrupt the running job
}
})
mrr.job = newJob
return mrr, err
}
// startRetryTask will make scheduler with retry task run
func (mrr *metadataReportRetry) startRetryTask() {
mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
mrr.scheduler.Start()
}
// MetadataReport is a absolute delegate for MetadataReport
type MetadataReport struct {
reportUrl common.URL
syncReport bool
metadataReportRetry *metadataReportRetry
failedReports map[*identifier.MetadataIdentifier]interface{}
failedReportsLock sync.RWMutex
// allMetadataReports store all the metdadata reports records in memory
allMetadataReports map[*identifier.MetadataIdentifier]interface{}
allMetadataReportsLock sync.RWMutex
}
// NewMetadataReport will create a MetadataReport with initiation
func NewMetadataReport() (*MetadataReport, error) {
url := instance.GetMetadataReportUrl()
bmr := &MetadataReport{
reportUrl: url,
syncReport: url.GetParamBool(constant.SYNC_REPORT_KEY, false),
failedReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
}
mrr, err := newMetadataReportRetry(
url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod),
url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes),
bmr.retry,
)
if err != nil {
return nil, err
}
bmr.metadataReportRetry = mrr
if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) {
scheduler := gocron.NewScheduler(time.UTC)
_, err := scheduler.Every(1).Day().Do(
func() {
logger.Info("start to publish all metadata in metadata report %v.", url)
bmr.allMetadataReportsLock.RLock()
bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
bmr.allMetadataReportsLock.RUnlock()
})
if err != nil {
return nil, err
}
scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
scheduler.Start()
}
return bmr, nil
}
// retry will do metadata failed reports collection by call metadata report sdk
func (mr *MetadataReport) retry() bool {
mr.failedReportsLock.RLock()
defer mr.failedReportsLock.RUnlock()
return mr.doHandlerMetadataCollection(mr.failedReports)
}
// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition
func (mr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
if mr.syncReport {
mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
go mr.storeMetadataTask(common.PROVIDER, identifier, definer)
}
// storeMetadataTask will delegate to call remote metadata's sdk to store
func (mr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer)
mr.allMetadataReportsLock.Lock()
mr.allMetadataReports[identifier] = definer
mr.allMetadataReportsLock.Unlock()
mr.failedReportsLock.Lock()
delete(mr.failedReports, identifier)
mr.failedReportsLock.Unlock()
// data is store the json marshaled definition
var (
data []byte
err error
)
defer func() {
if r := recover(); r != nil {
mr.failedReportsLock.Lock()
mr.failedReports[identifier] = definer
mr.failedReportsLock.Unlock()
mr.metadataReportRetry.startRetryTask()
logger.Errorf("Failed to put provider metadata %v in %v, cause: %v\n%s\n",
identifier, string(data), r, string(debug.Stack()))
}
}()
data, err = json.Marshal(definer)
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %+v", err)
panic(err)
}
report := instance.GetMetadataReportInstance()
if role == common.PROVIDER {
err = report.StoreProviderMetadata(identifier, string(data))
} else if role == common.CONSUMER {
err = report.StoreConsumerMetadata(identifier, string(data))
}
if err != nil {
logger.Errorf("storeProviderMetadataTask error in stage call metadata report to StoreProviderMetadata, msg is %+v", err)
panic(err)
}
}
// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition
func (mr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
if mr.syncReport {
mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
go mr.storeMetadataTask(common.CONSUMER, identifier, definer)
}
// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata
func (mr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.SaveServiceMetadata(identifier, url)
}
go report.SaveServiceMetadata(identifier, url)
return nil
}
// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata
func (mr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.RemoveServiceMetadata(identifier)
}
go report.RemoveServiceMetadata(identifier)
return nil
}
// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (mr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
report := instance.GetMetadataReportInstance()
return report.GetExportedURLs(identifier)
}
// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (mr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
report := instance.GetMetadataReportInstance()
if mr.syncReport {
return report.SaveSubscribedData(identifier, urls)
}
go report.SaveSubscribedData(identifier, urls)
return nil
}
// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string {
report := instance.GetMetadataReportInstance()
return report.GetSubscribedURLs(identifier)
}
// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string {
report := instance.GetMetadataReportInstance()
return report.GetServiceDefinition(identifier)
}
// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap
func (mr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
if len(metadataMap) == 0 {
return true
}
for e := range metadataMap {
if common.RoleType(common.PROVIDER).Role() == e.Side {
mr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
} else if common.RoleType(common.CONSUMER).Role() == e.Side {
mr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
}
}
return false
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package delegate
import (
"fmt"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
)
func TestMetadataReport_MetadataReportRetry(t *testing.T) {
counter := atomic.NewInt64(1)
retry, err := newMetadataReportRetry(1, 10, func() bool {
counter.Add(1)
return true
})
assert.NoError(t, err)
retry.startRetryTask()
itsTime := time.After(2500 * time.Millisecond)
select {
case <-itsTime:
retry.scheduler.Clear()
assert.Equal(t, counter.Load(), int64(3))
logger.Info("over")
}
}
func TestMetadataReport_MetadataReportRetryWithLimit(t *testing.T) {
counter := atomic.NewInt64(1)
retry, err := newMetadataReportRetry(1, 1, func() bool {
counter.Add(1)
return true
})
assert.NoError(t, err)
retry.startRetryTask()
itsTime := time.After(2500 * time.Millisecond)
select {
case <-itsTime:
retry.scheduler.Clear()
assert.Equal(t, counter.Load(), int64(2))
logger.Info("over")
}
}
func mockNewMetadataReport(t *testing.T) *MetadataReport {
syncReportKey := "false"
retryPeroidKey := "3"
retryTimesKey := "100"
cycleReportKey := "true"
url, err := common.NewURL(fmt.Sprintf(
"test://127.0.0.1:20000/?"+constant.SYNC_REPORT_KEY+"=%v&"+constant.RETRY_PERIOD_KEY+"=%v&"+
constant.RETRY_TIMES_KEY+"=%v&"+constant.CYCLE_REPORT_KEY+"=%v",
syncReportKey, retryPeroidKey, retryTimesKey, cycleReportKey))
assert.NoError(t, err)
instance.SetMetadataReportUrl(url)
mtr, err := NewMetadataReport()
assert.NoError(t, err)
assert.NotNil(t, mtr)
return mtr
}
func TestMetadataReport_StoreProviderMetadata(t *testing.T) {
mtr := mockNewMetadataReport(t)
var metadataId = &identifier.MetadataIdentifier{
Application: "app",
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: "com.ikurento.user.UserProvider",
Version: "0.0.1",
Group: "group1",
Side: "provider",
},
}
mtr.StoreProviderMetadata(metadataId, getMockDefinition(metadataId, t))
}
func getMockDefinition(id *identifier.MetadataIdentifier, t *testing.T) *definition.ServiceDefinition {
protocol := "dubbo"
beanName := "UserProvider"
url, err := common.NewURL(fmt.Sprintf(
"%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, id.ServiceInterface, id.Group, id.Version, beanName))
assert.NoError(t, err)
_, err = common.ServiceMap.Register(id.ServiceInterface, protocol, &definition.UserProvider{})
assert.NoError(t, err)
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
return definition.BuildServiceDefinition(*service, url)
}
......@@ -19,18 +19,17 @@ package report
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
)
// MetadataReport is an interface of remote metadata report
type MetadataReport interface {
StoreProviderMetadata(*identifier.MetadataIdentifier, *definition.ServiceDefinition)
StoreConsumerMetadata(*identifier.MetadataIdentifier, map[string]string)
SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, *common.URL)
RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier)
StoreProviderMetadata(*identifier.MetadataIdentifier, string) error
StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error
SaveServiceMetadata(*identifier.ServiceMetadataIdentifier, common.URL) error
RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error
GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string
SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []*common.URL)
SaveSubscribedData(*identifier.SubscriberMetadataIdentifier, []common.URL) error
GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string
GetServiceDefinition(*identifier.MetadataIdentifier)
GetServiceDefinition(*identifier.MetadataIdentifier) string
}
......@@ -49,7 +49,7 @@ func NewMetadataServiceExporter(metadataService service.MetadataService) exporte
func (exporter *MetadataServiceExporter) Export() error {
if !exporter.IsExported() {
serviceConfig := config.NewServiceConfig("MetadataService", context.Background())
serviceConfig := config.NewServiceConfig(constant.SIMPLE_METADATA_SERVICE_NAME, context.Background())
serviceConfig.Protocol = constant.DEFAULT_PROTOCOL
serviceConfig.Protocols = map[string]*config.ProtocolConfig{
constant.DEFAULT_PROTOCOL: generateMetadataProtocol(),
......
......@@ -17,7 +17,6 @@
package inmemory
import (
"encoding/json"
"sync"
)
......@@ -34,6 +33,9 @@ import (
"github.com/apache/dubbo-go/metadata/service"
)
// version will be used by Version func
const version = "1.0.0"
// MetadataService is store and query the metadata info in memory when each service registry
type MetadataService struct {
service.BaseMetadataService
......@@ -53,13 +55,13 @@ func NewMetadataService() *MetadataService {
}
}
// comparator is defined as Comparator for skip list to compare the URL
type comparator common.URL
// Comparator is defined as Comparator for skip list to compare the URL
type Comparator common.URL
// Compare is defined as Comparator for skip list to compare the URL
func (c comparator) Compare(comp cm.Comparator) int {
func (c Comparator) Compare(comp cm.Comparator) int {
a := common.URL(c).String()
b := common.URL(comp.(comparator)).String()
b := common.URL(comp.(Comparator)).String()
switch {
case a > b:
return 1
......@@ -79,7 +81,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
logger.Debug(url.ServiceKey())
if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
mts.lock.RLock()
wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url))
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
mts.lock.RUnlock()
return false
......@@ -88,12 +90,12 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
}
mts.lock.Lock()
//double chk
wantedUrl := urlSet.(*skip.SkipList).Get(comparator(*url))
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
if len(wantedUrl) > 0 && wantedUrl[0] != nil {
mts.lock.Unlock()
return false
}
urlSet.(*skip.SkipList).Insert(comparator(*url))
urlSet.(*skip.SkipList).Insert(Comparator(*url))
mts.lock.Unlock()
return true
}
......@@ -102,7 +104,7 @@ func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
if value, loaded := targetMap.Load(url.ServiceKey()); loaded {
mts.lock.Lock()
value.(*skip.SkipList).Delete(comparator(*url))
value.(*skip.SkipList).Delete(Comparator(*url))
mts.lock.Unlock()
mts.lock.RLock()
defer mts.lock.RUnlock()
......@@ -118,9 +120,9 @@ func (mts *MetadataService) getAllService(services *sync.Map) *skip.SkipList {
services.Range(func(key, value interface{}) bool {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" {
skipList.Insert(comparator(url))
url := common.URL(urls.ByPosition(i).(Comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME {
skipList.Insert(Comparator(url))
}
}
return true
......@@ -135,9 +137,9 @@ func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey s
if loaded {
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(comparator))
url := common.URL(urls.ByPosition(i).(Comparator))
if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
skipList.Insert(comparator(url))
skipList.Insert(Comparator(url))
}
}
}
......@@ -182,9 +184,9 @@ func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
// //TODO:generate the service definition and store it
//}
sd := definition.BuildServiceDefinition(*service, url)
data, err := json.Marshal(sd)
data, err := sd.ToBytes()
if err != nil {
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error: ", url, err)
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err)
}
mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
return nil
......@@ -221,12 +223,12 @@ func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string)
return v.(string), nil
}
// Version will return the version of metadata service
func (mts *MetadataService) Version() string {
return "1.0.0"
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
return true
}
// Version will return the version of metadata service
func (mts *MetadataService) Reference() string {
return "MetadataService"
func (mts *MetadataService) Version() string {
return version
}
......@@ -18,10 +18,8 @@
package inmemory
import (
"context"
"fmt"
"testing"
"time"
)
import (
......@@ -33,29 +31,6 @@ import (
"github.com/apache/dubbo-go/metadata/definition"
)
type User struct {
Id string
Name string
Age int32
Time time.Time
}
type UserProvider struct {
}
func (u *UserProvider) GetUser(ctx context.Context, req []interface{}) (*User, error) {
rsp := User{"A001", "Alex Stocks", 18, time.Now()}
return &rsp, nil
}
func (u *UserProvider) Reference() string {
return "UserProvider"
}
func (u User) JavaClassName() string {
return "com.ikurento.user.User"
}
func TestMetadataService(t *testing.T) {
mts := NewMetadataService()
serviceName := "com.ikurento.user.UserProvider"
......@@ -111,7 +86,7 @@ func TestMetadataService(t *testing.T) {
list4, _ := mts.GetSubscribedURLs()
assert.Equal(t, uint64(0), list4.Len())
userProvider := &UserProvider{}
userProvider := &definition.UserProvider{}
common.ServiceMap.Register(serviceName, protocol, userProvider)
mts.PublishServiceDefinition(u)
expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," +
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"github.com/Workiva/go-datastructures/slice/skip"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report/delegate"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service/inmemory"
)
// version will be used by Version func
const version = "1.0.0"
// MetadataService is a implement of metadata service which will delegate the remote metadata report
type MetadataService struct {
service.BaseMetadataService
inMemoryMetadataService *inmemory.MetadataService
exportedRevision atomic.String
subscribedRevision atomic.String
delegateReport *delegate.MetadataReport
}
// NewMetadataService will create a new remote MetadataService instance
func NewMetadataService() (*MetadataService, error) {
mr, err := delegate.NewMetadataReport()
if err != nil {
return nil, err
}
return &MetadataService{
inMemoryMetadataService: inmemory.NewMetadataService(),
delegateReport: mr,
}, nil
}
// setInMemoryMetadataService will replace the in memory metadata service by the specific param
func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) {
mts.inMemoryMetadataService = metadata
}
// ExportURL will be implemented by in memory service
func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
return true, nil
}
// UnexportURL
func (mts *MetadataService) UnexportURL(url common.URL) error {
smi := identifier.NewServiceMetadataIdentifier(url)
smi.Revision = mts.exportedRevision.Load()
return mts.delegateReport.RemoveServiceMetadata(smi)
}
// SubscribeURL will be implemented by in memory service
func (MetadataService) SubscribeURL(url common.URL) (bool, error) {
return true, nil
}
// UnsubscribeURL will be implemented by in memory service
func (MetadataService) UnsubscribeURL(url common.URL) error {
return nil
}
// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition
func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
sd := definition.BuildServiceDefinition(*service, url)
id := &identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
Group: url.GetParam(constant.GROUP_KEY, ""),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
}
// GetExportedURLs will be implemented by in memory service
func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
return nil, nil
}
// GetSubscribedURLs will be implemented by in memory service
func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
return nil, nil
}
// GetServiceDefinition will be implemented by in memory service
func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return "", nil
}
// GetServiceDefinitionByServiceKey will be implemented by in memory service
func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
return "", nil
}
// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
result := true
if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() {
mts.exportedRevision.Store(exportedRevision)
urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
iterator := urls.Iter(inmemory.Comparator{})
logger.Infof("urls length = %v", urls.Len())
for {
if !iterator.Next() {
break
}
url := iterator.Value().(inmemory.Comparator)
id := identifier.NewServiceMetadataIdentifier(common.URL(url))
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
}
if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() {
mts.subscribedRevision.Store(subscribedRevision)
urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
result = false
}
if urls != nil && urls.Len() > 0 {
id := &identifier.SubscriberMetadataIdentifier{
MetadataIdentifier: identifier.MetadataIdentifier{
Application: config.GetApplicationConfig().Name,
},
Revision: subscribedRevision,
}
if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
}
return result
}
// Version will return the remote service version
func (MetadataService) Version() string {
return version
}
// convertUrls will convert the skip list to slice
func convertUrls(list *skip.SkipList) []common.URL {
urls := make([]common.URL, list.Len())
iterator := list.Iter(inmemory.Comparator{})
for {
if iterator.Value() == nil {
break
}
url := iterator.Value().(inmemory.Comparator)
urls = append(urls, common.URL(url))
if !iterator.Next() {
break
}
}
return urls
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"fmt"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config/instance"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report"
"github.com/apache/dubbo-go/metadata/report/factory"
"github.com/apache/dubbo-go/metadata/service/inmemory"
)
var serviceMetadata = make(map[*identifier.ServiceMetadataIdentifier]common.URL, 4)
var subscribedMetadata = make(map[*identifier.SubscriberMetadataIdentifier][]common.URL, 4)
func getMetadataReportFactory() factory.MetadataReportFactory {
return &metadataReportFactory{}
}
type metadataReportFactory struct {
}
func (mrf *metadataReportFactory) CreateMetadataReport(*common.URL) report.MetadataReport {
return &metadataReport{}
}
type metadataReport struct {
}
func (metadataReport) StoreProviderMetadata(*identifier.MetadataIdentifier, string) error {
return nil
}
func (metadataReport) StoreConsumerMetadata(*identifier.MetadataIdentifier, string) error {
return nil
}
func (mr *metadataReport) SaveServiceMetadata(id *identifier.ServiceMetadataIdentifier, url common.URL) error {
logger.Infof("SaveServiceMetadata , url is %v", url)
serviceMetadata[id] = url
return nil
}
func (metadataReport) RemoveServiceMetadata(*identifier.ServiceMetadataIdentifier) error {
return nil
}
func (metadataReport) GetExportedURLs(*identifier.ServiceMetadataIdentifier) []string {
return nil
}
func (mr *metadataReport) SaveSubscribedData(id *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
logger.Infof("SaveSubscribedData, , url is %v", urls)
subscribedMetadata[id] = urls
return nil
}
func (metadataReport) GetSubscribedURLs(*identifier.SubscriberMetadataIdentifier) []string {
return nil
}
func (metadataReport) GetServiceDefinition(*identifier.MetadataIdentifier) string {
return ""
}
func TestMetadataService(t *testing.T) {
extension.SetMetadataReportFactory("mock", getMetadataReportFactory)
u, err := common.NewURL(fmt.Sprintf(
"mock://127.0.0.1:20000/?sync.report=true"))
assert.NoError(t, err)
instance.GetMetadataReportInstance(&u)
mts, err := NewMetadataService()
assert.NoError(t, err)
mts.setInMemoryMetadataService(mockInmemoryProc(t))
mts.RefreshMetadata("0.0.1", "0.0.1")
assert.Equal(t, 1, len(serviceMetadata))
assert.Equal(t, 1, len(subscribedMetadata))
}
func mockInmemoryProc(t *testing.T) *inmemory.MetadataService {
mts := inmemory.NewMetadataService()
serviceName := "com.ikurento.user.UserProvider"
group := "group1"
version := "0.0.1"
protocol := "dubbo"
beanName := "UserProvider"
u, err := common.NewURL(fmt.Sprintf(
"%v://127.0.0.1:20000/com.ikurento.user.UserProvider1?anyhost=true&"+
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+
"environment=dev&interface=%v&ip=192.168.56.1&methods=GetUser&module=dubbogo+user-info+server&org=ikurento.com&"+
"owner=ZX&pid=1447&revision=0.0.1&side=provider&timeout=3000&timestamp=1556509797245&group=%v&version=%v&bean.name=%v",
protocol, serviceName, group, version, beanName))
assert.NoError(t, err)
mts.ExportURL(u)
mts.SubscribeURL(u)
userProvider := &definition.UserProvider{}
common.ServiceMap.Register(serviceName, protocol, userProvider)
mts.PublishServiceDefinition(u)
expected := "{\"CanonicalName\":\"com.ikurento.user.UserProvider\",\"CodeSource\":\"\"," +
"\"Methods\":[{\"Name\":\"GetUser\",\"ParameterTypes\":[\"slice\"],\"ReturnType\":\"ptr\"," +
"\"Parameters\":null}],\"Types\":null}"
def1, _ := mts.GetServiceDefinition(serviceName, group, version)
assert.Equal(t, expected, def1)
serviceKey := definition.ServiceDescriperBuild(serviceName, group, version)
def2, _ := mts.GetServiceDefinitionByServiceKey(serviceKey)
assert.Equal(t, expected, def2)
return mts
}
......@@ -23,6 +23,7 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/config"
)
......@@ -49,6 +50,8 @@ type MetadataService interface {
GetServiceDefinition(interfaceName string, group string, version string) (string, error)
// GetServiceDefinition will get the target service info store in metadata by service key
GetServiceDefinitionByServiceKey(serviceKey string) (string, error)
// RefreshMetadata will refresh the metadata
RefreshMetadata(exportedRevision string, subscribedRevision string) bool
// Version will return the metadata service version
Version() string
}
......@@ -61,3 +64,8 @@ type BaseMetadataService struct {
func (mts *BaseMetadataService) ServiceName() (string, error) {
return config.GetApplicationConfig().Name, nil
}
// Version will return the version of metadata service
func (mts *BaseMetadataService) Reference() string {
return constant.SIMPLE_METADATA_SERVICE_NAME
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment