Skip to content
Snippets Groups Projects
service.go 8.47 KiB
Newer Older
vito.he's avatar
vito.he committed
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package inmemory

import (
flycash's avatar
flycash committed
	"sort"
vito.he's avatar
vito.he committed
	"sync"
)

import (
vito.he's avatar
vito.he committed
	cm "github.com/Workiva/go-datastructures/common"
	"github.com/Workiva/go-datastructures/slice/skip"
vito.he's avatar
vito.he committed
)

import (
	"github.com/apache/dubbo-go/common"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/constant"
flycash's avatar
flycash committed
	"github.com/apache/dubbo-go/common/extension"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/common/logger"
flycash's avatar
flycash committed
	"github.com/apache/dubbo-go/config"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/metadata/definition"
vito.he's avatar
vito.he committed
	"github.com/apache/dubbo-go/metadata/service"
)

flycash's avatar
flycash committed
// version will be used by Version func
const (
	version = "1.0.0"
	local   = "local"
)

flycash's avatar
flycash committed
func init() {
flycash's avatar
flycash committed
	extension.SetMetadataService(local, NewMetadataService)
vito.he's avatar
vito.he committed
// MetadataService is store and query the metadata info in memory when each service registry
vito.he's avatar
vito.he committed
type MetadataService struct {
	service.BaseMetadataService
vito.he's avatar
vito.he committed
	exportedServiceURLs   *sync.Map
	subscribedServiceURLs *sync.Map
vito.he's avatar
vito.he committed
	serviceDefinitions    *sync.Map
vito.he's avatar
vito.he committed
	lock                  *sync.RWMutex
}

var (
	metadataServiceInstance *MetadataService
	metadataServiceInitOnce sync.Once
)

vito.he's avatar
vito.he committed
// NewMetadataService: initiate a metadata service
// it should be singleton
flycash's avatar
flycash committed
func NewMetadataService() (service.MetadataService, error) {
	metadataServiceInitOnce.Do(func() {
		metadataServiceInstance = &MetadataService{
			BaseMetadataService:   service.NewBaseMetadataService(config.GetApplicationConfig().Name),
			exportedServiceURLs:   &sync.Map{},
			subscribedServiceURLs: &sync.Map{},
			serviceDefinitions:    &sync.Map{},
			lock:                  &sync.RWMutex{},
		}
	})
	return metadataServiceInstance, nil
vito.he's avatar
vito.he committed
}

// Comparator is defined as Comparator for skip list to compare the URL
type Comparator common.URL
vito.he's avatar
vito.he committed

// Compare is defined as Comparator for skip list to compare the URL
func (c Comparator) Compare(comp cm.Comparator) int {
vito.he's avatar
vito.he committed
	a := common.URL(c).String()
	b := common.URL(comp.(Comparator)).String()
vito.he's avatar
vito.he committed
	switch {
vito.he's avatar
vito.he committed
	case a > b:
vito.he's avatar
vito.he committed
		return 1
vito.he's avatar
vito.he committed
	case a < b:
vito.he's avatar
vito.he committed
		return -1
	default:
		return 0
	}
}

vito.he's avatar
vito.he committed
// addURL will add URL in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) addURL(targetMap *sync.Map, url *common.URL) bool {
vito.he's avatar
vito.he committed
	var (
		urlSet interface{}
		loaded bool
	)
vito.he's avatar
vito.he committed
	logger.Debug(url.ServiceKey())
vito.he's avatar
vito.he committed
	if urlSet, loaded = targetMap.LoadOrStore(url.ServiceKey(), skip.New(uint64(0))); loaded {
vito.he's avatar
vito.he committed
		mts.lock.RLock()
		wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
vito.he's avatar
vito.he committed
		if len(wantedUrl) > 0 && wantedUrl[0] != nil {
vito.he's avatar
vito.he committed
			mts.lock.RUnlock()
vito.he's avatar
vito.he committed
			return false
		}
vito.he's avatar
vito.he committed
		mts.lock.RUnlock()
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed
	mts.lock.Lock()
flycash's avatar
flycash committed
	// double chk
	wantedUrl := urlSet.(*skip.SkipList).Get(Comparator(*url))
vito.he's avatar
vito.he committed
	if len(wantedUrl) > 0 && wantedUrl[0] != nil {
vito.he's avatar
vito.he committed
		mts.lock.Unlock()
		return false
	}
	urlSet.(*skip.SkipList).Insert(Comparator(*url))
vito.he's avatar
vito.he committed
	mts.lock.Unlock()
vito.he's avatar
vito.he committed
	return true
}

vito.he's avatar
vito.he committed
// removeURL is used to remove specified url
vito.he's avatar
vito.he committed
func (mts *MetadataService) removeURL(targetMap *sync.Map, url *common.URL) {
vito.he's avatar
vito.he committed
	if value, loaded := targetMap.Load(url.ServiceKey()); loaded {
vito.he's avatar
vito.he committed
		mts.lock.Lock()
		value.(*skip.SkipList).Delete(Comparator(*url))
vito.he's avatar
vito.he committed
		mts.lock.Unlock()
		mts.lock.RLock()
		defer mts.lock.RUnlock()
vito.he's avatar
vito.he committed
		if value.(*skip.SkipList).Len() == 0 {
vito.he's avatar
vito.he committed
			targetMap.Delete(url.ServiceKey())
		}
	}
}

vito.he's avatar
vito.he committed
// getAllService can return all the exportedUrlString except for metadataService
flycash's avatar
flycash committed
func (mts *MetadataService) getAllService(services *sync.Map) []common.URL {
	// using skip list to dedup and sorting
	res := make([]common.URL, 0)
vito.he's avatar
vito.he committed
	services.Range(func(key, value interface{}) bool {
vito.he's avatar
vito.he committed
		urls := value.(*skip.SkipList)
		for i := uint64(0); i < urls.Len(); i++ {
			url := common.URL(urls.ByPosition(i).(Comparator))
flycash's avatar
flycash committed
			if url.GetParam(constant.INTERFACE_KEY, url.Path) != constant.METADATA_SERVICE_NAME {
flycash's avatar
flycash committed
				res = append(res, url)
vito.he's avatar
vito.he committed
			}
vito.he's avatar
vito.he committed
		}
vito.he's avatar
vito.he committed
		return true
	})
flycash's avatar
flycash committed
	sort.Sort(common.URLSlice(res))
	return res
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// getSpecifiedService can return specified service url by serviceKey
flycash's avatar
flycash committed
func (mts *MetadataService) getSpecifiedService(services *sync.Map, serviceKey string, protocol string) []common.URL {
	res := make([]common.URL, 0)
vito.he's avatar
vito.he committed
	serviceList, loaded := services.Load(serviceKey)
vito.he's avatar
vito.he committed
	if loaded {
vito.he's avatar
vito.he committed
		urls := serviceList.(*skip.SkipList)
		for i := uint64(0); i < urls.Len(); i++ {
			url := common.URL(urls.ByPosition(i).(Comparator))
			if len(protocol) == 0 || protocol == constant.ANY_VALUE || url.Protocol == protocol || url.GetParam(constant.PROTOCOL_KEY, "") == protocol {
flycash's avatar
flycash committed
				res = append(res, url)
vito.he's avatar
vito.he committed
			}
vito.he's avatar
vito.he committed
		}
flycash's avatar
flycash committed
		sort.Stable(common.URLSlice(res))
vito.he's avatar
vito.he committed
	}
flycash's avatar
flycash committed
	return res
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// ExportURL can store the in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) ExportURL(url common.URL) (bool, error) {
	return mts.addURL(mts.exportedServiceURLs, &url), nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// UnexportURL can remove the url store in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) UnexportURL(url common.URL) error {
vito.he's avatar
vito.he committed
	mts.removeURL(mts.exportedServiceURLs, &url)
vito.he's avatar
vito.he committed
	return nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// SubscribeURL can store the in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) SubscribeURL(url common.URL) (bool, error) {
	return mts.addURL(mts.subscribedServiceURLs, &url), nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// UnsubscribeURL can remove the url store in memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) UnsubscribeURL(url common.URL) error {
vito.he's avatar
vito.he committed
	mts.removeURL(mts.subscribedServiceURLs, &url)
vito.he's avatar
vito.he committed
	return nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// PublishServiceDefinition: publish url's service metadata info, and write into memory
vito.he's avatar
vito.he committed
func (mts *MetadataService) PublishServiceDefinition(url common.URL) error {
vito.he's avatar
vito.he committed
	interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
	isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
	if len(interfaceName) > 0 && !isGeneric {
flycash's avatar
flycash committed
		// judge is consumer or provider
		// side := url.GetParam(constant.SIDE_KEY, "")
		// var service event.RPCService
vito.he's avatar
vito.he committed
		service := common.ServiceMap.GetService(url.Protocol, url.GetParam(constant.BEAN_NAME_KEY, url.Service()))
flycash's avatar
flycash committed
		// if side == event.RoleType(event.CONSUMER).Role() {
vito.he's avatar
vito.he committed
		//	//TODO:generate the service definition and store it
		//
flycash's avatar
flycash committed
		// } else if side == event.RoleType(event.PROVIDER).Role() {
vito.he's avatar
vito.he committed
		//	//TODO:generate the service definition and store it
flycash's avatar
flycash committed
		// }
vito.he's avatar
vito.he committed
		sd := definition.BuildServiceDefinition(*service, url)
		data, err := sd.ToBytes()
vito.he's avatar
vito.he committed
		if err != nil {
			logger.Errorf("publishProvider getServiceDescriptor error. providerUrl:%v , error:%v ", url, err)
vito.he's avatar
vito.he committed
		}
vito.he's avatar
vito.he committed
		mts.serviceDefinitions.Store(url.ServiceKey(), string(data))
		return nil
vito.he's avatar
vito.he committed
	}
vito.he's avatar
vito.he committed
	logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
	return nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// GetExportedURLs get all exported urls
flycash's avatar
flycash committed
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
vito.he's avatar
vito.he committed
	if serviceInterface == constant.ANY_VALUE {
flycash's avatar
flycash committed
		return service.ConvertURLArrToIntfArr(mts.getAllService(mts.exportedServiceURLs)), nil
vito.he's avatar
vito.he committed
	} else {
		serviceKey := definition.ServiceDescriperBuild(serviceInterface, group, version)
flycash's avatar
flycash committed
		return service.ConvertURLArrToIntfArr(mts.getSpecifiedService(mts.exportedServiceURLs, serviceKey, protocol)), nil
vito.he's avatar
vito.he committed
	}
}

// GetSubscribedURLs get all subscribedUrl
flycash's avatar
flycash committed
func (mts *MetadataService) GetSubscribedURLs() ([]common.URL, error) {
vito.he's avatar
vito.he committed
	return mts.getAllService(mts.subscribedServiceURLs), nil
}

// GetServiceDefinition can get service definition by interfaceName, group and version
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
	serviceKey := definition.ServiceDescriperBuild(interfaceName, group, version)
	v, _ := mts.serviceDefinitions.Load(serviceKey)
	return v.(string), nil
}

// GetServiceDefinition can get service definition by serviceKey
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
	v, _ := mts.serviceDefinitions.Load(serviceKey)
	return v.(string), nil
vito.he's avatar
vito.he committed
}

// RefreshMetadata will always return true because it will be implement by remote service
flycash's avatar
flycash committed
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
	return true, nil
vito.he's avatar
vito.he committed
}

vito.he's avatar
vito.he committed
// Version will return the version of metadata service
flycash's avatar
flycash committed
func (mts *MetadataService) Version() (string, error) {
	return version, nil
vito.he's avatar
vito.he committed
}