Skip to content
Snippets Groups Projects
in_memory.go 5.99 KiB
Newer Older
vito.he's avatar
vito.he committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package inmemory

import (
	"sync"
)

import (
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/logger"
vito.he's avatar
vito.he committed
	"github.com/emirpasic/gods/sets"
	"github.com/emirpasic/gods/sets/treeset"
vito.he's avatar
vito.he committed
	"github.com/emirpasic/gods/utils"
vito.he's avatar
vito.he committed
)

import (
	"github.com/apache/dubbo-go/common"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/metadata/definition"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/metadata/service"
)

// InMemoryMetadataService is store and query the metadata info in memory when each service registry
type MetadataService struct {
	service.BaseMetadataService
vito.he's avatar
vito.he committed
	exportedServiceURLs   *sync.Map
	subscribedServiceURLs *sync.Map
	lock                  *sync.RWMutex
}

// NewMetadataService: initiate a metadata service
func NewMetadataService() *MetadataService {
	return &MetadataService{
		exportedServiceURLs:   new(sync.Map),
		subscribedServiceURLs: new(sync.Map),
		lock:                  new(sync.RWMutex),
	}
vito.he's avatar
vito.he committed
}

// urlComparator: defined as utils.Comparator for treeset to compare the URL
func urlComparator(a, b interface{}) int {
vito.he's avatar
vito.he committed
	url1 := a.(*common.URL)
	url2 := b.(*common.URL)
vito.he's avatar
vito.he committed
	switch {
	case url1.String() > url2.String():
		return 1
	case url1.String() < url2.String():
		return -1
	default:
		return 0
	}
}

// addURL: add URL in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
vito.he's avatar
vito.he committed
	var (
		urlSet interface{}
		loaded bool
	)
vito.he's avatar
vito.he committed
	logger.Debug(url.ServiceKey())
vito.he's avatar
vito.he committed
	if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), treeset.NewWith(urlComparator)); loaded {
vito.he's avatar
vito.he committed
		mts.lock.RLock()
		if urlSet.(*treeset.Set).Contains(url) {
			mts.lock.RUnlock()
vito.he's avatar
vito.he committed
			return false
		}
vito.he's avatar
vito.he committed
		mts.lock.RUnlock()
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed
	mts.lock.Lock()
	//double chk
	if urlSet.(*treeset.Set).Contains(url) {
		mts.lock.Unlock()
		return false
	}
	urlSet.(*treeset.Set).Add(url)
	mts.lock.Unlock()
vito.he's avatar
vito.he committed
	return true
}

vito.he's avatar
vito.he committed
// removeURL: used to remove specified url
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
vito.he's avatar
vito.he committed
	if value, loaded := targetMap.Load(url.ServiceKey()); loaded {
vito.he's avatar
vito.he committed
		mts.lock.Lock()
		value.(*treeset.Set).Remove(url)
		mts.lock.Unlock()
		mts.lock.RLock()
		defer mts.lock.RUnlock()
		if value.(*treeset.Set).Empty() {
vito.he's avatar
vito.he committed
			targetMap.Delete(url.ServiceKey())
		}
	}
}

vito.he's avatar
vito.he committed
// getAllService: return all the exportedUrlString except for metadataService
func (mts *MetadataService) getAllService(services *sync.Map) sets.Set {
	sets := treeset.NewWith(utils.StringComparator)
	services.Range(func(key, value interface{}) bool {
		urls := value.(*treeset.Set)
		urls.Each(func(index int, value interface{}) {
			url := value.(*common.URL)
			if url.GetParam(constant.INTERFACE_KEY, url.Path) != "MetadataService" {
				sets.Add(url.String())
			}
		})
		return true
	})
	return sets
}

// getSpecifiedService: return specified service url by serviceKey
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) sets.Set {
	targetSets := treeset.NewWith(utils.StringComparator)
	serviceSet, loaded := services.Load(serviceKey)
	if loaded {
		serviceSet.(*treeset.Set).Each(func(index int, value interface{}) {
			url := value.(*common.URL)
			if len(protocol) == 0 || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
				targetSets.Add(value.(*common.URL).String())
			}
		})
	}
	return targetSets
}

vito.he's avatar
vito.he committed
// ExportURL: store the in memory treeset
func (mts *MetadataService) ExportURL(url common.URL) bool {
vito.he's avatar
vito.he committed
	return mts.addURL(mts.exportedServiceURLs, &url)
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// UnexportURL: remove the url store in memory treeset
func (mts *MetadataService) UnexportURL(url common.URL) {
	mts.removeURL(mts.exportedServiceURLs, &url)
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// SubscribeURL...
vito.he's avatar
vito.he committed
func (mts *MetadataService) SubscribeURL(url common.URL) bool {
vito.he's avatar
vito.he committed
	return mts.addURL(mts.subscribedServiceURLs, &url)
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// UnsubscribeURL...
func (mts *MetadataService) UnsubscribeURL(url common.URL) {
	mts.removeURL(mts.subscribedServiceURLs, &url)
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// PublishServiceDefinition: publish url's service metadata info, and write into memory
vito.he's avatar
vito.he committed
func (MetadataService) PublishServiceDefinition(url common.URL) {
vito.he's avatar
vito.he committed
	interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
	isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
	if len(interfaceName) > 0 && !isGeneric {
		//judge is consumer or provider
		role := url.GetParam(constant.SIDE_KEY, "")
		//var service common.RPCService
		if role == common.RoleType(common.CONSUMER).Role() {

			//TODO:BOSS FANG
		} else if role == common.RoleType(common.PROVIDER).Role() {
			//TODO:BOSS FANG
		}

	}
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// GetExportedURLs get all exported urls
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) sets.Set {
	if serviceInterface == constant.ANY_VALUE {
		return mts.getAllService(mts.exportedServiceURLs)
	} else {
		serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
		return mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol)
	}
}

// GetSubscribedURLs get all subscribedUrl
func (mts *MetadataService) GetSubscribedURLs() sets.Set {
	return mts.getAllService(mts.subscribedServiceURLs)
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
func (MetadataService) GetServiceDefinition(interfaceName string, group string, version string) string {
vito.he's avatar
vito.he committed
	panic("implement me")
}

func (MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) string {
	panic("implement me")
}