Skip to content
Snippets Groups Projects
service.go 7.68 KiB
Newer Older
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package remote

import (
flycash's avatar
flycash committed
	"sync"
flycash's avatar
flycash committed
)
flycash's avatar
flycash committed
import (
	"go.uber.org/atomic"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
flycash's avatar
flycash committed
	"github.com/apache/dubbo-go/common/extension"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/config"
	"github.com/apache/dubbo-go/metadata/definition"
	"github.com/apache/dubbo-go/metadata/identifier"
	"github.com/apache/dubbo-go/metadata/report/delegate"
	"github.com/apache/dubbo-go/metadata/service"
	"github.com/apache/dubbo-go/metadata/service/inmemory"
)

vito.he's avatar
vito.he committed
// version will be used by Version func
flycash's avatar
flycash committed
const (
	version = "1.0.0"
	remote  = "remote"
)
vito.he's avatar
vito.he committed

flycash's avatar
flycash committed
func init() {
flycash's avatar
flycash committed
	extension.SetMetadataService(remote, newMetadataService)
// MetadataService is a implement of metadata service which will delegate the remote metadata report
flycash's avatar
flycash committed
// This is singleton
type MetadataService struct {
	service.BaseMetadataService
	inMemoryMetadataService *inmemory.MetadataService
	exportedRevision        atomic.String
	subscribedRevision      atomic.String
	delegateReport          *delegate.MetadataReport
}

flycash's avatar
flycash committed
var (
	metadataServiceOnce     sync.Once
	metadataServiceInstance *MetadataService
)

flycash's avatar
flycash committed
// newMetadataService will create a new remote MetadataService instance
func newMetadataService() (service.MetadataService, error) {
flycash's avatar
flycash committed
	var err error
	metadataServiceOnce.Do(func() {
		var mr *delegate.MetadataReport
		mr, err = delegate.NewMetadataReport()
		if err != nil {
			return
		}
flycash's avatar
flycash committed
		// it will never return error
		inms, _ := inmemory.NewMetadataService()
flycash's avatar
flycash committed
		metadataServiceInstance = &MetadataService{
flycash's avatar
flycash committed
			BaseMetadataService:     service.NewBaseMetadataService(config.GetApplicationConfig().Name),
			inMemoryMetadataService: inms.(*inmemory.MetadataService),
flycash's avatar
flycash committed
			delegateReport:          mr,
		}
	})
	return metadataServiceInstance, err
}

// setInMemoryMetadataService will replace the in memory metadata service by the specific param
func (mts *MetadataService) setInMemoryMetadataService(metadata *inmemory.MetadataService) {
	mts.inMemoryMetadataService = metadata
}

// ExportURL will be implemented by in memory service
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) ExportURL(url *common.URL) (bool, error) {
flycash's avatar
flycash committed
	return mts.inMemoryMetadataService.ExportURL(url)
flycash's avatar
flycash committed
// UnexportURL remove @url's metadata
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) UnexportURL(url *common.URL) error {
	smi := identifier.NewServiceMetadataIdentifier(url)
	smi.Revision = mts.exportedRevision.Load()
	return mts.delegateReport.RemoveServiceMetadata(smi)
}

// SubscribeURL will be implemented by in memory service
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) SubscribeURL(url *common.URL) (bool, error) {
flycash's avatar
flycash committed
	return mts.inMemoryMetadataService.SubscribeURL(url)
}

// UnsubscribeURL will be implemented by in memory service
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) UnsubscribeURL(url *common.URL) error {
flycash's avatar
flycash committed
	return mts.UnsubscribeURL(url)
}

// PublishServiceDefinition will call remote metadata's StoreProviderMetadata to store url info and service definition
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) PublishServiceDefinition(url *common.URL) error {
	interfaceName := url.GetParam(constant.INTERFACE_KEY, "")
	isGeneric := url.GetParamBool(constant.GENERIC_KEY, false)
	if len(interfaceName) > 0 && !isGeneric {
		sv := common.ServiceMap.GetServiceByServiceKey(url.Protocol, url.ServiceKey())
flycash's avatar
flycash committed
		sd := definition.BuildServiceDefinition(*sv, url)
		id := &identifier.MetadataIdentifier{
			BaseMetadataIdentifier: identifier.BaseMetadataIdentifier{
				ServiceInterface: interfaceName,
				Version:          url.GetParam(constant.VERSION_KEY, ""),
flycash's avatar
flycash committed
				// Group:            url.GetParam(constant.GROUP_KEY, constant.SERVICE_DISCOVERY_DEFAULT_GROUP),
flycash's avatar
flycash committed
				Group: url.GetParam(constant.GROUP_KEY, constant.DUBBO),
flycash's avatar
flycash committed
				Side:  url.GetParam(constant.SIDE_KEY, "provider"),
			},
		}
		mts.delegateReport.StoreProviderMetadata(id, sd)
flycash's avatar
flycash committed
		return nil
	}
	logger.Errorf("publishProvider interfaceName is empty . providerUrl:%v ", url)
	return nil
}

// GetExportedURLs will be implemented by in memory service
flycash's avatar
flycash committed
func (mts *MetadataService) GetExportedURLs(serviceInterface string, group string, version string, protocol string) ([]interface{}, error) {
flycash's avatar
flycash committed
	return mts.inMemoryMetadataService.GetExportedURLs(serviceInterface, group, version, protocol)
}

// GetSubscribedURLs will be implemented by in memory service
haohongfan's avatar
haohongfan committed
func (mts *MetadataService) GetSubscribedURLs() ([]*common.URL, error) {
flycash's avatar
flycash committed
	return mts.inMemoryMetadataService.GetSubscribedURLs()
}

// GetServiceDefinition will be implemented by in memory service
flycash's avatar
flycash committed
func (mts *MetadataService) GetServiceDefinition(interfaceName string, group string, version string) (string, error) {
	return mts.inMemoryMetadataService.GetServiceDefinition(interfaceName, group, version)
}

// GetServiceDefinitionByServiceKey will be implemented by in memory service
flycash's avatar
flycash committed
func (mts *MetadataService) GetServiceDefinitionByServiceKey(serviceKey string) (string, error) {
	return mts.inMemoryMetadataService.GetServiceDefinitionByServiceKey(serviceKey)
}

// RefreshMetadata will refresh the exported & subscribed metadata to remote metadata report from the inmemory metadata service
flycash's avatar
flycash committed
func (mts *MetadataService) RefreshMetadata(exportedRevision string, subscribedRevision string) (bool, error) {
	if len(exportedRevision) != 0 && exportedRevision != mts.exportedRevision.Load() {
		mts.exportedRevision.Store(exportedRevision)
		urls, err := mts.inMemoryMetadataService.GetExportedURLs(constant.ANY_VALUE, "", "", "")
		if err != nil {
vito.he's avatar
vito.he committed
			logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
flycash's avatar
flycash committed
			return false, err
flycash's avatar
flycash committed
		logger.Infof("urls length = %v", len(urls))
flycash's avatar
flycash committed
		for _, ui := range urls {

			u, err := common.NewURL(ui.(string))
			if err != nil {
				logger.Errorf("this is not valid url string: %s ", ui.(string))
				continue
			}
			id := identifier.NewServiceMetadataIdentifier(u)
			id.Revision = mts.exportedRevision.Load()
flycash's avatar
flycash committed
			if err := mts.delegateReport.SaveServiceMetadata(id, u); err != nil {
vito.he's avatar
vito.he committed
				logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
flycash's avatar
flycash committed
				return false, err
			}
		}
	}

	if len(subscribedRevision) != 0 && subscribedRevision != mts.subscribedRevision.Load() {
		mts.subscribedRevision.Store(subscribedRevision)
		urls, err := mts.inMemoryMetadataService.GetSubscribedURLs()
		if err != nil {
vito.he's avatar
vito.he committed
			logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %v+", err)
flycash's avatar
flycash committed
			return false, err
flycash's avatar
flycash committed
		if urls != nil && len(urls) > 0 {
			id := &identifier.SubscriberMetadataIdentifier{
				MetadataIdentifier: identifier.MetadataIdentifier{
					Application: config.GetApplicationConfig().Name,
				},
				Revision: subscribedRevision,
			}
flycash's avatar
flycash committed
			if err := mts.delegateReport.SaveSubscribedData(id, urls); err != nil {
vito.he's avatar
vito.he committed
				logger.Errorf("Error occur when execute remote.MetadataService.RefreshMetadata, error message is %+v", err)
flycash's avatar
flycash committed
				return false, err
flycash's avatar
flycash committed
	return true, nil
}

// Version will return the remote service version
flycash's avatar
flycash committed
func (MetadataService) Version() (string, error) {
	return version, nil