diff --git a/common/constant/time.go b/common/constant/time.go new file mode 100644 index 0000000000000000000000000000000000000000..be1baaca67f474aa92e86e529d03400948ef4612 --- /dev/null +++ b/common/constant/time.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package constant + +import ( + "time" +) + +var ( + MsToNanoRate = int64(time.Millisecond / time.Nanosecond) +) diff --git a/common/extension/metrics.go b/common/extension/metrics.go new file mode 100644 index 0000000000000000000000000000000000000000..8f3234117657667831149a92a8fb7c6fddc7ecd6 --- /dev/null +++ b/common/extension/metrics.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "github.com/apache/dubbo-go/metrics" +) + +var( + metricReporterMap = make(map[string]metrics.Reporter, 4) +) + +// set a reporter with the name +func SetMetricReporter(name string, reporter metrics.Reporter) { + metricReporterMap[name] = reporter +} + +// find the reporter with name. +// if not found, it will panic. +// we should know that this method usually is called when system starts, so we should panic +func GetMetricReporter(name string) metrics.Reporter { + reporter, found := metricReporterMap[name] + if !found { + panic("Cannot find the reporter with name: " + name) + } + return reporter +} diff --git a/common/extension/metrics_test.go b/common/extension/metrics_test.go new file mode 100644 index 0000000000000000000000000000000000000000..74a02e8935f87fb1081543f7abe24d324c685758 --- /dev/null +++ b/common/extension/metrics_test.go @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package extension + +import ( + "context" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/protocol" +) + +func TestGetMetricReporter(t *testing.T) { + reporter := &mockReporter{} + name := "mock" + SetMetricReporter(name, reporter) + res := GetMetricReporter(name) + assert.Equal(t, reporter, res) +} + +type mockReporter struct { +} + +func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration) { +} diff --git a/config/base_config.go b/config/base_config.go index 09495741153cf7caae4bb10ada0aaa686fbf0325..f38ef9b953997888ce47a7bc8aaa2e6577cfe377 100644 --- a/config/base_config.go +++ b/config/base_config.go @@ -40,12 +40,14 @@ type multiConfiger interface { Prefix() string } -// BaseConfig ... +// BaseConfig is the common configuration for provider and consumer type BaseConfig struct { ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"` configCenterUrl *common.URL prefix string fatherConfig interface{} + + MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"` } func (c *BaseConfig) startConfigCenter(ctx context.Context) error { diff --git a/config/config_loader.go b/config/config_loader.go index d6eb7ff524a53c6949d22a2b34eb965274a75232..73bab42b6a9f086d2c31c6cdec47bc3e798b8fcb 100644 --- a/config/config_loader.go +++ b/config/config_loader.go @@ -33,6 +33,7 @@ import ( var ( consumerConfig *ConsumerConfig providerConfig *ProviderConfig + metricConfig *MetricConfig maxWait = 3 ) @@ -75,6 +76,9 @@ func Load() { if consumerConfig == nil { logger.Warnf("consumerConfig is nil!") } else { + + metricConfig = consumerConfig.MetricConfig + checkApplicationName(consumerConfig.ApplicationConfig) if err := configCenterRefreshConsumer(); err != nil { logger.Errorf("[consumer config center refresh] %#v", err) @@ -131,6 +135,10 @@ func Load() { if providerConfig == nil { logger.Warnf("providerConfig is nil!") } else { + + // so, you should know that the consumer's metric config will be override + metricConfig = providerConfig.MetricConfig + checkApplicationName(providerConfig.ApplicationConfig) if err := configCenterRefreshProvider(); err != nil { logger.Errorf("[provider config center refresh] %#v", err) diff --git a/config/metric_config.go b/config/metric_config.go new file mode 100644 index 0000000000000000000000000000000000000000..71071b289b2be0a174fbbb12f5d7c08cceda57b6 --- /dev/null +++ b/config/metric_config.go @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "github.com/creasty/defaults" +) + +// This is the config struct for all metrics implementation +type MetricConfig struct { + Reporters []string `yaml:"reporters" json:"reporters,omitempty"` +} + +// parse the config from yml +func (c *MetricConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain MetricConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +// find the MetricConfig +// if it is nil, create a new one +func GetMetricConfig() *MetricConfig { + if metricConfig == nil { + metricConfig = &MetricConfig{} + } + return metricConfig +} + + diff --git a/config/metric_config_test.go b/config/metric_config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ff8b795506f4fe5d408ca4330186cf0b1ff4ae27 --- /dev/null +++ b/config/metric_config_test.go @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package config + +import ( + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + + +func TestGetMetricConfig(t *testing.T) { + empty := GetMetricConfig() + assert.NotNil(t, empty) +} diff --git a/filter/filter_impl/metrics_filter.go b/filter/filter_impl/metrics_filter.go new file mode 100644 index 0000000000000000000000000000000000000000..e3db21a979f4e9433d420a633c206e0434138a78 --- /dev/null +++ b/filter/filter_impl/metrics_filter.go @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter_impl + +import ( + "context" + "time" + + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/filter" + "github.com/apache/dubbo-go/metrics" + "github.com/apache/dubbo-go/protocol" +) + +const ( + metricFilterName = "metrics" +) + +var ( + metricFilterInstance filter.Filter +) + +// must initialized before using the filter and after loading configuration +func init() { + extension.SetFilter(metricFilterName, newMetricsFilter) +} + +// metricFilter will calculate the invocation's duration and the report to the reporters +// If you want to use this filter to collect the metrics, +// Adding this into your configuration file, like: +// filter: "metrics" +// metrics: +// reporter: +// - "your reporter" # here you should specify the reporter, for example 'prometheus' +// more info please take a look at dubbo-samples projects +type metricsFilter struct { + reporters []metrics.Reporter +} + +// using goroutine to report the duration. +func (p *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + start := time.Now() + res := invoker.Invoke(ctx, invocation) + end := time.Now() + duration := end.Sub(start) + go func() { + for _, reporter := range p.reporters { + reporter.Report(ctx, invoker, invocation, duration) + } + }() + return res +} + +// do nothing and return the result +func (p *metricsFilter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result { + return res +} + +func newMetricsFilter() filter.Filter { + if metricFilterInstance == nil { + reporterNames := config.GetMetricConfig().Reporters + reporters := make([]metrics.Reporter, 0, len(reporterNames)) + for _, name := range reporterNames { + reporters = append(reporters, extension.GetMetricReporter(name)) + } + metricFilterInstance = &metricsFilter{ + reporters: reporters, + } + } + + return metricFilterInstance +} diff --git a/filter/filter_impl/metrics_filter_test.go b/filter/filter_impl/metrics_filter_test.go new file mode 100644 index 0000000000000000000000000000000000000000..f461a0d14d2dd27da4fb6fc35ae1023118e62962 --- /dev/null +++ b/filter/filter_impl/metrics_filter_test.go @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package filter_impl + +import ( + "context" + "sync" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" +) + +func TestMetricsFilter_Invoke(t *testing.T) { + + // prepare the mock reporter + config.GetMetricConfig().Reporters = []string{"mock"} + mk := &mockReporter{} + extension.SetMetricReporter("mock", mk) + + instance := extension.GetFilter(metricFilterName) + + url, _ := common.NewURL(context.Background(), + "dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+ + "&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+ + "loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+ + "BDTService&organization=ikurento.com&owner=ZX®istry.role=3&retries=&"+ + "service.filter=echo%2Ctoken%2Caccesslog×tamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100") + invoker := protocol.NewBaseInvoker(url) + + attach := make(map[string]string, 10) + inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach) + + ctx := context.Background() + + mk.On("Report", ctx, invoker, inv).Return(true, nil) + + mk.wg.Add(1) + result := instance.Invoke(ctx, invoker, inv) + assert.NotNil(t, result) + mk.AssertNotCalled(t, "Report", 1) + // it will do nothing + result = instance.OnResponse(ctx, nil, invoker, inv) + assert.Nil(t, result) +} + +type mockReporter struct { + mock.Mock + wg sync.WaitGroup +} + +func (m *mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration) { + m.Called(ctx, invoker, invocation) + m.wg.Done() +} diff --git a/metrics/prometheus/prometheus_reporter.go b/metrics/prometheus/prometheus_reporter.go new file mode 100644 index 0000000000000000000000000000000000000000..245e2b0c6730fa44443a2b05d36a96ead1d19a2e --- /dev/null +++ b/metrics/prometheus/prometheus_reporter.go @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package prometheus + +import ( + "context" + "strconv" + "strings" + "time" +) +import ( + "github.com/prometheus/client_golang/prometheus" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/metrics" + "github.com/apache/dubbo-go/protocol" +) + +const ( + reporterName = "prometheus" + serviceKey = constant.SERVICE_KEY + groupKey = constant.GROUP_KEY + versionKey = constant.VERSION_KEY + methodKey = constant.METHOD_KEY + timeoutKey = constant.TIMEOUT_KEY + remoteKey = "remote" + localKey = "local" + + providerKey = "provider" + consumerKey = "consumer" +) + +func init() { + extension.SetMetricReporter(reporterName, newPrometheus()) +} + +// it will collect the data for Prometheus +type PrometheusReporter struct { + + // report the consumer-side's data + consumerVec *prometheus.SummaryVec + // report the provider-side's data + providerVec *prometheus.SummaryVec +} + +// report the duration to Prometheus +func (reporter *PrometheusReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration) { + url := invoker.GetUrl() + var sumVec *prometheus.SummaryVec + if isProvider(url) { + sumVec = reporter.providerVec + } else { + sumVec = reporter.consumerVec + } + + sumVec.With(prometheus.Labels{ + serviceKey: url.Service(), + groupKey: url.GetParam(groupKey, ""), + versionKey: url.GetParam(versionKey, ""), + methodKey: invocation.MethodName(), + timeoutKey: url.GetParam(timeoutKey, ""), + remoteKey: invocation.AttachmentsByKey(constant.REMOTE_ADDR, ""), + localKey: invocation.AttachmentsByKey(constant.REMOTE_ADDR, ""), + }).Observe(float64(cost.Nanoseconds() / constant.MsToNanoRate)) +} + +func isProvider(url common.URL) bool { + side := url.GetParam(constant.ROLE_KEY, "") + return strings.EqualFold(side, strconv.Itoa(common.PROVIDER)) +} + +func newPrometheus() metrics.Reporter { + // cfg := *config.GetMetricConfig().GetPrometheusConfig() + result := &PrometheusReporter{ + consumerVec: newSummaryVec(consumerKey), + providerVec: newSummaryVec(providerKey), + } + prometheus.MustRegister(result.consumerVec, result.providerVec) + return result +} + +func newSummaryVec(side string) *prometheus.SummaryVec { + + return prometheus.NewSummaryVec( + prometheus.SummaryOpts{ + Namespace: metrics.NameSpace, + Help: "this is the dubbo's metrics", + Subsystem: side, + Name: serviceKey, + Objectives: map[float64]float64{ + 0.5: 0.01, + 0.75: 0.01, + 0.90: 0.005, + 0.98: 0.002, + 0.99: 0.001, + 0.999: 0.0001, + }, + }, + []string{serviceKey, groupKey, versionKey, methodKey, timeoutKey, remoteKey, localKey}, + ) +} diff --git a/metrics/reporter.go b/metrics/reporter.go new file mode 100644 index 0000000000000000000000000000000000000000..b6e4c6c0ecbd8ebd44d68d29e8f6e5fcddb991c7 --- /dev/null +++ b/metrics/reporter.go @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package metrics + +import ( + "context" + "time" + + "github.com/apache/dubbo-go/protocol" +) + +const ( + NameSpace = "dubbo" +) + +// it will be use to report the invocation's duration +type Reporter interface { + // report the duration of an invocation + Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration) +}