Skip to content
Snippets Groups Projects
delegate_report.go 9.64 KiB
Newer Older
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package delegate

import (
	"encoding/json"
	"sync"
	"time"
)

import (
	"github.com/go-co-op/gocron"
	"go.uber.org/atomic"
)

import (
	"github.com/apache/dubbo-go/common"
	"github.com/apache/dubbo-go/common/constant"
	"github.com/apache/dubbo-go/common/logger"
	"github.com/apache/dubbo-go/config/instance"
	"github.com/apache/dubbo-go/metadata/definition"
	"github.com/apache/dubbo-go/metadata/identifier"
)

const (
	// defaultMetadataReportRetryTimes is defined for max  times to retry
	defaultMetadataReportRetryTimes int64 = 100
	// defaultMetadataReportRetryPeriod is defined for cycle interval to retry, the unit is second
	defaultMetadataReportRetryPeriod int64 = 3
	// defaultMetadataReportRetryPeriod is defined for cycle report or not
	defaultMetadataReportCycleReport bool = true
)

// metadataReportRetry is a scheduler for retrying task
type metadataReportRetry struct {
	retryPeriod  int64
	retryLimit   int64
	scheduler    *gocron.Scheduler
	job          *gocron.Job
	retryCounter *atomic.Int64
	// if no failed report, wait how many times to run retry task.
	retryTimesIfNonFail int64
}

// newMetadataReportRetry will create a scheduler for retry task
func newMetadataReportRetry(retryPeriod int64, retryLimit int64, retryFunc func() bool) (*metadataReportRetry, error) {
	s1 := gocron.NewScheduler(time.UTC)

	mrr := &metadataReportRetry{
		retryPeriod:         retryPeriod,
		retryLimit:          retryLimit,
		scheduler:           s1,
		retryCounter:        atomic.NewInt64(0),
		retryTimesIfNonFail: 600,
	}

	newJob, err := mrr.scheduler.Every(uint64(mrr.retryPeriod)).Seconds().Do(
		func() {
			mrr.retryCounter.Inc()
			logger.Infof("start to retry task for metadata report. retry times: %v", mrr.retryCounter.Load())
			if mrr.retryCounter.Load() > mrr.retryLimit {
				mrr.scheduler.Clear()
			} else if retryFunc() && mrr.retryCounter.Load() > mrr.retryTimesIfNonFail {
				mrr.scheduler.Clear() // may not interrupt the running job
			}
		})

	mrr.job = newJob
	return mrr, err
}

// startRetryTask will make scheduler with retry task run
func (mrr *metadataReportRetry) startRetryTask() {
	mrr.scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
	mrr.scheduler.Start()
}

// MetadataReport is a absolute delegate for MetadataReport
type MetadataReport struct {
	reportUrl           common.URL
	syncReport          bool
	metadataReportRetry *metadataReportRetry

	failedReports     map[*identifier.MetadataIdentifier]interface{}
	failedReportsLock sync.RWMutex

	// allMetadataReports store all the metdadata reports records in memory
	allMetadataReports     map[*identifier.MetadataIdentifier]interface{}
	allMetadataReportsLock sync.RWMutex
}

// NewMetadataReport will create a MetadataReport with initiation
func NewMetadataReport() (*MetadataReport, error) {
	url := instance.GetMetadataReportUrl()
	bmr := &MetadataReport{
		reportUrl:          url,
		syncReport:         url.GetParamBool(constant.SYNC_REPORT_KEY, false),
		failedReports:      make(map[*identifier.MetadataIdentifier]interface{}, 4),
		allMetadataReports: make(map[*identifier.MetadataIdentifier]interface{}, 4),
	}

	mrr, err := newMetadataReportRetry(
		url.GetParamInt(constant.RETRY_PERIOD_KEY, defaultMetadataReportRetryPeriod),
		url.GetParamInt(constant.RETRY_TIMES_KEY, defaultMetadataReportRetryTimes),
		bmr.retry,
	)

	if err != nil {
		return nil, err
	}

	bmr.metadataReportRetry = mrr
	if url.GetParamBool(constant.CYCLE_REPORT_KEY, defaultMetadataReportCycleReport) {
		scheduler := gocron.NewScheduler(time.UTC)
		_, err := scheduler.Every(1).Day().Do(
			func() {
				logger.Info("start to publish all metadata.")
				bmr.allMetadataReportsLock.RLock()
				bmr.doHandlerMetadataCollection(bmr.allMetadataReports)
				bmr.allMetadataReportsLock.RUnlock()

			})
		if err != nil {
			return nil, err
		}
		scheduler.StartAt(time.Now().Add(500 * time.Millisecond))
		scheduler.Start()
	}
	return bmr, nil
}

// retry will do metadata failed reports collection by call metadata report sdk
func (bmr *MetadataReport) retry() bool {
	bmr.failedReportsLock.RLock()
vito.he's avatar
vito.he committed
	defer bmr.failedReportsLock.RUnlock()
	return bmr.doHandlerMetadataCollection(bmr.failedReports)
}

// StoreProviderMetadata will delegate to call remote metadata's sdk to store provider service definition
func (bmr *MetadataReport) StoreProviderMetadata(identifier *identifier.MetadataIdentifier, definer definition.ServiceDefiner) {
	if bmr.syncReport {
		bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
	}
vito.he's avatar
vito.he committed
	go bmr.storeMetadataTask(common.PROVIDER, identifier, definer)
}

// storeMetadataTask will delegate to call remote metadata's sdk to store
func (bmr *MetadataReport) storeMetadataTask(role int, identifier *identifier.MetadataIdentifier, definer interface{}) {
	logger.Infof("store provider metadata. Identifier :%v ; definition: %v .", identifier, definer)
	bmr.allMetadataReportsLock.Lock()
	bmr.allMetadataReports[identifier] = definer
	bmr.allMetadataReportsLock.Unlock()

	bmr.failedReportsLock.Lock()
	delete(bmr.failedReports, identifier)
	bmr.failedReportsLock.Unlock()
	// data is store the json marshaled definition
	var (
		data []byte
		err  error
	)

	defer func() {
		if r := recover(); r != nil {
			bmr.failedReportsLock.Lock()
			bmr.failedReports[identifier] = definer
			bmr.failedReportsLock.Unlock()
			bmr.metadataReportRetry.startRetryTask()
			logger.Errorf("Failed to put provider metadata %v in %v, cause: %v", identifier, string(data), r)
		}
	}()

	data, err = json.Marshal(definer)
	if err != nil {
		logger.Errorf("storeProviderMetadataTask error in stage json.Marshal, msg is %v", err)
		panic(err)
	}
	report := instance.GetMetadataReportInstance()
	if role == common.PROVIDER {
		err = report.StoreProviderMetadata(identifier, string(data))
	} else if role == common.CONSUMER {
		err = report.StoreConsumerMetadata(identifier, string(data))
	}

	if err != nil {
		logger.Errorf("storeProviderMetadataTask error in stage call  metadata report to StoreProviderMetadata, msg is %v", err)
		panic(err)
	}
}

// StoreConsumerMetadata will delegate to call remote metadata's sdk to store consumer side service definition
func (bmr *MetadataReport) StoreConsumerMetadata(identifier *identifier.MetadataIdentifier, definer map[string]string) {
	if bmr.syncReport {
		bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
	}
vito.he's avatar
vito.he committed
	go bmr.storeMetadataTask(common.CONSUMER, identifier, definer)
}

// SaveServiceMetadata will delegate to call remote metadata's sdk to save service metadata
func (bmr *MetadataReport) SaveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier, url common.URL) error {
	report := instance.GetMetadataReportInstance()
	if bmr.syncReport {
		return report.SaveServiceMetadata(identifier, url)
	}
vito.he's avatar
vito.he committed
	go report.SaveServiceMetadata(identifier, url)
	return nil
}

// RemoveServiceMetadata will delegate to call remote metadata's sdk to remove service metadata
func (bmr *MetadataReport) RemoveServiceMetadata(identifier *identifier.ServiceMetadataIdentifier) error {
	report := instance.GetMetadataReportInstance()
	if bmr.syncReport {
		return report.RemoveServiceMetadata(identifier)
	}
vito.he's avatar
vito.he committed
	go report.RemoveServiceMetadata(identifier)
	return nil
}

// GetExportedURLs will delegate to call remote metadata's sdk to get exported urls
func (bmr *MetadataReport) GetExportedURLs(identifier *identifier.ServiceMetadataIdentifier) []string {
	report := instance.GetMetadataReportInstance()
	return report.GetExportedURLs(identifier)
}

// SaveSubscribedData will delegate to call remote metadata's sdk to save subscribed data
func (bmr *MetadataReport) SaveSubscribedData(identifier *identifier.SubscriberMetadataIdentifier, urls []common.URL) error {
	report := instance.GetMetadataReportInstance()
	if bmr.syncReport {
		return report.SaveSubscribedData(identifier, urls)
	}
vito.he's avatar
vito.he committed
	go report.SaveSubscribedData(identifier, urls)
	return nil
}

// GetSubscribedURLs will delegate to call remote metadata's sdk to get subscribed urls
func (MetadataReport) GetSubscribedURLs(identifier *identifier.SubscriberMetadataIdentifier) []string {
	report := instance.GetMetadataReportInstance()
	return report.GetSubscribedURLs(identifier)
}

// GetServiceDefinition will delegate to call remote metadata's sdk to get service definitions
func (MetadataReport) GetServiceDefinition(identifier *identifier.MetadataIdentifier) string {
	report := instance.GetMetadataReportInstance()
	return report.GetServiceDefinition(identifier)
}

// doHandlerMetadataCollection will store metadata to metadata support with given metadataMap
func (bmr *MetadataReport) doHandlerMetadataCollection(metadataMap map[*identifier.MetadataIdentifier]interface{}) bool {
	if len(metadataMap) == 0 {
		return true
	}
	for e := range metadataMap {
		if common.RoleType(common.PROVIDER).Role() == e.Side {
			bmr.StoreProviderMetadata(e, metadataMap[e].(*definition.FullServiceDefinition))
		} else if common.RoleType(common.CONSUMER).Role() == e.Side {
			bmr.StoreConsumerMetadata(e, metadataMap[e].(map[string]string))
		}
	}
	return false
}