Newer
Older
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package inmemory
import (
// version will be used by Version func
const (
version = "1.0.0"
local = "local"
)
extension.SetMetadataService(local, NewMetadataService)
// MetadataService is store and query the metadata info in memory when each service registry
exportedServiceURLs *sync.Map
subscribedServiceURLs *sync.Map
var (
metadataServiceInstance *MetadataService
metadataServiceInitOnce sync.Once
)
func NewMetadataService() (service.MetadataService, error) {
metadataServiceInitOnce.Do(func() {
metadataServiceInstance = &MetadataService{
BaseMetadataService: service.NewBaseMetadataService(config.GetApplicationConfig().Name),
exportedServiceURLs: &sync.Map{},
subscribedServiceURLs: &sync.Map{},
serviceDefinitions: &sync.Map{},
lock: &sync.RWMutex{},
}
})
return metadataServiceInstance, nil
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
mts.lock.Unlock()
mts.lock.RLock()
defer mts.lock.RUnlock()
// getAllService can return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) []*common.URL {
urls := value.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
if url.Service() != constant.METADATA_SERVICE_NAME {
// getSpecifiedService can return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) []*common.URL {
var res []*common.URL
urls := serviceList.(*skip.SkipList)
for i := uint64(0); i < urls.Len(); i++ {
if len(protocol) == 0 || protocol == constant.ANY_VALUE || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) {
return mts.addURL(mts.exportedServiceURLs, url), nil
func (mts *MetadataService) UnexportURL(url *common.URL) error {
mts.removeURL(mts.exportedServiceURLs, url)
func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) {
return mts.addURL(mts.subscribedServiceURLs, url), nil
func (mts *MetadataService) UnsubscribeURL(url *common.URL) error {
mts.removeURL(mts.subscribedServiceURLs, url)
// PublishServiceDefinition: publish url's service metadata info, and write into memory
func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
if len(interfaceName) > 0 && !isGeneric {
tmpService := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
sd := definition.BuildServiceDefinition(*tmpService, url)
data, err := sd.ToBytes()
logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err)
mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
return nil
logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
return nil
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
return service.ConvertURLArrToIntfArr(mts.getAllService(mts.exportedServiceURLs)), nil
} else {
serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
return service.ConvertURLArrToIntfArr(mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol)), nil
}
}
// GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() ([]*common.URL, error) {
return mts.getAllService(mts.subscribedServiceURLs), nil
}
// GetServiceDefinition can get service definition by interfaceName, group and version
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version)
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
}
// GetServiceDefinition can get service definition by serviceKey
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
v, _ := mts.serviceDefinitions.Load(serviceKey)
return v.(string), nil
// RefreshMetadata will always return true because it will be implement by remote service
func (mts *MetadataService) RefreshMetadata(string, string) (bool, error) {
func (mts *MetadataService) Version() (string, error) {
return version, nil