Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inmemory
import (
cm "github.com/Workiva/go-datastructures/common"
"github.com/Workiva/go-datastructures/slice/skip"
// version will be used by Version func
const version = "1.0.0"
// MetadataService is store and query the metadata info in memory when each service registry
exportedServiceURLs *sync.Map
subscribedServiceURLs *sync.Map
lock *sync.RWMutex
}
// NewMetadataService: initiate a metadata service
func NewMetadataService() *MetadataService {
return &MetadataService{
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
// Comparator is defined as Comparator for skip list to compare the URL
type Comparator common.URL
// Compare is defined as Comparator for skip list to compare the URL
func (c Comparator) Compare(comp cm.Comparator) int {
b := common.URL(comp.(Comparator)).String()
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
urlSet.(*skip.SkipList).Insert(Comparator(*url))
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
value.(*skip.SkipList).Delete(Comparator(*url))
mts.lock.Unlock()
mts.lock.RLock()
defer mts.lock.RUnlock()
// getAllService can return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) []common.URL {
// using skip list to dedup and sorting
res := make([]common.URL, 0)
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.SIMPLE_METADATA_SERVICE_NAME {
// getSpecifiedService can return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) []common.URL {
res := make([]common.URL, 0)
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
url := common.URL(urls.ByPosition(i).(Comparator))
if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
return mts.addURL(mts.exportedServiceURLs, &url), nil
func (mts *MetadataService) UnexportURL(url common.URL) error {
func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) {
return mts.addURL(mts.subscribedServiceURLs, &url), nil
func (mts *MetadataService) UnsubscribeURL(url common.URL) error {
// PublishServiceDefinition: publish url's service metadata info, and write into memory
func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
// judge is consumer or provider
// side := url.GetParam(constant.SIDE_KEY, "")
// var service event.RPCService
service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
// //TODO:generate the service definition and store it
//
data, err := sd.ToBytes()
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err)
mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
return nil
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]common.URL, error) {
} else {
serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol), nil
}
}
// GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() ([]common.URL, error) {
return mts.getAllService(mts.subscribedServiceURLs), nil
}
// GetServiceDefinition can get service definition by interfaceName, group and version
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version)
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
}
// GetServiceDefinition can get service definition by serviceKey
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) bool {
return true
func (mts *MetadataService) Version() string {