Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package remote
import (
"github.com/Workiva/go-datastructures/slice/skip"
"go.uber.org/atomic"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/config"
"github.com/apache/dubbo-go/metadata/definition"
"github.com/apache/dubbo-go/metadata/identifier"
"github.com/apache/dubbo-go/metadata/report/delegate"
"github.com/apache/dubbo-go/metadata/service"
"github.com/apache/dubbo-go/metadata/service/inmemory"
)
// version will be used by Version func
const version = "1.0.0"
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// MetadataService is a implement of metadata service which will delegate the remote metadata report
type MetadataService struct {
service.BaseMetadataService
inMemoryMetadataService *inmemory.MetadataService
exportedRevision atomic.String
subscribedRevision atomic.String
delegateReport *delegate.MetadataReport
}
// NewMetadataService will create a new remote MetadataService instance
func NewMetadataService() (*MetadataService, error) {
mr, err := delegate.NewMetadataReport()
if err != nil {
return nil, err
}
return &MetadataService{
inMemoryMetadataService: inmemory.NewMetadataService(),
delegateReport: mr,
}, nil
}
// setInMemoryMetadataService will replace the in memory metadata service by the specific param
func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) {
mts.inMemoryMetadataService = metadata
}
// ExportURL will be implemented by in memory service
func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
return true, nil
}
// UnexportURL
func (mts *MetadataService) UnexportURL(url common.URL) error {
smi := identifier.NewServiceMetadataIdentifier(url)
smi.Revision = mts.exportedRevision.Load()
return mts.delegateReport.RemoveServiceMetadata(smi)
}
// SubscribeURL will be implemented by in memory service
func (MetadataService) SubscribeURL(url common.URL) (bool, error) {
return true, nil
}
// UnsubscribeURL will be implemented by in memory service
func (MetadataService) UnsubscribeURL(url common.URL) error {
return nil
}
// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition
func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
sd := definition.BuildServiceDefinition(*service, url)
id := &identifier.MetadataIdentifier{
BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
ServiceInterface: interfaceName,
Version: url.GetParam(constant.VERSION_KEY, ""),
Group: url.GetParam(constant.GROUP_KEY, ""),
},
}
mts.delegateReport.StoreProviderMetadata(id, sd)
}
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
}
// GetExportedURLs will be implemented by in memory service
func (MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) (*skip.SkipList, error) {
return nil, nil
}
// GetSubscribedURLs will be implemented by in memory service
func (MetadataService) GetSubscribedURLs() (*skip.SkipList, error) {
return nil, nil
}
// GetServiceDefinition will be implemented by in memory service
func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
return "", nil
}
// GetServiceDefinitionByServiceKey will be implemented by in memory service
func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
return "", nil
}
// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
result := true
if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() {
mts.exportedRevision.Store(exportedRevision)
urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
iterator := urls.Iter(inmemory.Comparator{})
logger.Infof("urls length = %v", urls.Len())
for {
if !iterator.Next() {
break
}
url := iterator.Value().(inmemory.Comparator)
id := identifier.NewServiceMetadataIdentifier(common.URL(url))
id.Revision = mts.exportedRevision.Load()
if err := mts.delegateReport.SaveServiceMetadata(id, common.URL(url)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
}
if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() {
mts.subscribedRevision.Store(subscribedRevision)
urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
if err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
result = false
}
if urls != nil && urls.Len() > 0 {
id := &identifier.SubscriberMetadataIdentifier{
MetadataIdentifier: identifier.MetadataIdentifier{
Application: config.GetApplicationConfig().Name,
},
Revision: subscribedRevision,
}
if err := mts.delegateReport.SaveSubscribedData(id, convertUrls(urls)); err != nil {
logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
result = false
}
}
}
return result
}
// Version will return the remote service version
func (MetadataService) Version() string {
}
// convertUrls will convert the skip list to slice
func convertUrls(list *skip.SkipList) []common.URL {
urls := make([]common.URL, list.Len())
iterator := list.Iter(inmemory.Comparator{})
for {
if iterator.Value() == nil {
break
}
url := iterator.Value().(inmemory.Comparator)
urls = append(urls, common.URL(url))
if !iterator.Next() {
break
}
}
return urls
}