Skip to content
Snippets Groups Projects
Commit 0a8a866d authored by 邹毅贤's avatar 邹毅贤
Browse files

Merge branch 'develop' into feature/addRouter

# Conflicts:
#	common/constant/key.go
#	config/config_loader.go
#	registry/zookeeper/registry.go
parents b9be732d e072df18
No related branches found
No related tags found
No related merge requests found
Showing
with 437 additions and 26 deletions
......@@ -169,3 +169,21 @@ const (
// Priority Priority key in router module
RouterPriority = "priority"
)
const (
CONSUMER_SIGN_FILTER = "sign"
PROVIDER_AUTH_FILTER = "auth"
SERVICE_AUTH_KEY = "auth"
AUTHENTICATOR_KEY = "authenticator"
DEFAULT_AUTHENTICATOR = "accesskeys"
DEFAULT_ACCESS_KEY_STORAGE = "urlstorage"
ACCESS_KEY_STORAGE_KEY = "accessKey.storage"
REQUEST_TIMESTAMP_KEY = "timestamp"
REQUEST_SIGNATURE_KEY = "signature"
AK_KEY = "ak"
SIGNATURE_STRING_FORMAT = "%s#%s#%s#%s"
PARAMTER_SIGNATURE_ENABLE_KEY = "param.sign"
CONSUMER = "consumer"
ACCESS_KEY_ID_KEY = "accessKeyId"
SECRET_ACCESS_KEY_KEY = "secretAccessKey"
)
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package constant
import (
"time"
)
var (
MsToNanoRate = int64(time.Millisecond / time.Nanosecond)
)
package extension
import (
"github.com/apache/dubbo-go/filter"
)
var (
authenticators = make(map[string]func() filter.Authenticator)
accesskeyStorages = make(map[string]func() filter.AccessKeyStorage)
)
func SetAuthenticator(name string, fcn func() filter.Authenticator) {
authenticators[name] = fcn
}
func GetAuthenticator(name string) filter.Authenticator {
if authenticators[name] == nil {
panic("authenticator for " + name + " is not existing, make sure you have import the package.")
}
return authenticators[name]()
}
func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) {
accesskeyStorages[name] = fcn
}
func GetAccesskeyStorages(name string) filter.AccessKeyStorage {
if accesskeyStorages[name] == nil {
panic("accesskeyStorages for " + name + " is not existing, make sure you have import the package.")
}
return accesskeyStorages[name]()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"github.com/apache/dubbo-go/metrics"
)
var (
// we couldn't store the instance because the some instance may initialize before loading configuration
// so lazy initialization will be better.
metricReporterMap = make(map[string]func() metrics.Reporter, 4)
)
// SetMetricReporter set a reporter with the name
func SetMetricReporter(name string, reporterFunc func() metrics.Reporter) {
metricReporterMap[name] = reporterFunc
}
// GetMetricReporter find the reporter with name.
// if not found, it will panic.
// we should know that this method usually is called when system starts, so we should panic
func GetMetricReporter(name string) metrics.Reporter {
reporterFunc, found := metricReporterMap[name]
if !found {
panic("Cannot find the reporter with name: " + name)
}
return reporterFunc()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package extension
import (
"context"
"testing"
"time"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/metrics"
"github.com/apache/dubbo-go/protocol"
)
func TestGetMetricReporter(t *testing.T) {
reporter := &mockReporter{}
name := "mock"
SetMetricReporter(name, func() metrics.Reporter {
return reporter
})
res := GetMetricReporter(name)
assert.Equal(t, reporter, res)
}
type mockReporter struct {
}
func (m mockReporter) Report(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation, cost time.Duration, res protocol.Result) {
}
......@@ -360,6 +360,28 @@ func (c URL) ServiceKey() string {
return buf.String()
}
// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
intf := c.GetParam(constant.INTERFACE_KEY, strings.TrimPrefix(c.Path, "/"))
if intf == "" {
return ""
}
buf := &bytes.Buffer{}
buf.WriteString(intf)
buf.WriteString(":")
version := c.GetParam(constant.VERSION_KEY, "")
if version != "" && version != "0.0.0" {
buf.WriteString(version)
}
group := c.GetParam(constant.GROUP_KEY, "")
buf.WriteString(":")
if group != "" {
buf.WriteString(group)
}
return buf.String()
}
// EncodedServiceKey ...
func (c *URL) EncodedServiceKey() string {
serviceKey := c.ServiceKey()
......
......@@ -271,3 +271,17 @@ func TestClone(t *testing.T) {
assert.Equal(t, u1.Protocol, "dubbo")
assert.Equal(t, u2.Protocol, "provider")
}
func TestColonSeparatedKey(t *testing.T) {
u1, _ := NewURL(context.TODO(), "dubbo://127.0.0.1:20000")
u1.AddParam(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider")
assert.Equal(t, u1.ColonSeparatedKey(), u1.GetParam(constant.INTERFACE_KEY, "")+"::")
u1.AddParam(constant.VERSION_KEY, "version1")
assert.Equal(t, u1.ColonSeparatedKey(), u1.GetParam(constant.INTERFACE_KEY, "")+":version1:")
u1.AddParam(constant.GROUP_KEY, "group1")
assert.Equal(t, u1.ColonSeparatedKey(), u1.GetParam(constant.INTERFACE_KEY, "")+":version1:group1")
u1.SetParam(constant.VERSION_KEY, "")
assert.Equal(t, u1.ColonSeparatedKey(), u1.GetParam(constant.INTERFACE_KEY, "")+"::group1")
}
......@@ -43,12 +43,14 @@ type multiConfiger interface {
Prefix() string
}
// BaseConfig ...
// BaseConfig is the common configuration for provider and consumer
type BaseConfig struct {
ConfigCenterConfig *ConfigCenterConfig `yaml:"config_center" json:"config_center,omitempty"`
configCenterUrl *common.URL
prefix string
fatherConfig interface{}
MetricConfig *MetricConfig `yaml:"metrics" json:"metrics,omitempty"`
}
func (c *BaseConfig) startConfigCenter(ctx context.Context) error {
......
......@@ -31,10 +31,11 @@ import (
)
var (
consumerConfig *ConsumerConfig
providerConfig *ProviderConfig
routerConfig *ConditionRouterConfig
maxWait = 3
consumerConfig *ConsumerConfig
providerConfig *ProviderConfig
metricConfig *MetricConfig
applicationConfig *ApplicationConfig
maxWait = 3
)
// loaded consumer & provider config from xxx.yml, and log config from xxx.xml
......@@ -86,6 +87,10 @@ func Load() {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
} else {
metricConfig = consumerConfig.MetricConfig
applicationConfig = consumerConfig.ApplicationConfig
checkApplicationName(consumerConfig.ApplicationConfig)
if err := configCenterRefreshConsumer(); err != nil {
logger.Errorf("[consumer config center refresh] %#v", err)
......@@ -143,6 +148,11 @@ func Load() {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
} else {
// so, you should know that the consumer's config will be override
metricConfig = providerConfig.MetricConfig
applicationConfig = providerConfig.ApplicationConfig
checkApplicationName(providerConfig.ApplicationConfig)
if err := configCenterRefreshProvider(); err != nil {
logger.Errorf("[provider config center refresh] %#v", err)
......@@ -174,3 +184,42 @@ func GetRPCService(name string) common.RPCService {
func RPCService(service common.RPCService) {
consumerConfig.References[service.Reference()].Implement(service)
}
// GetMetricConfig find the MetricConfig
// if it is nil, create a new one
func GetMetricConfig() *MetricConfig {
if metricConfig == nil {
metricConfig = &MetricConfig{}
}
return metricConfig
}
// GetApplicationConfig find the application config
// if not, we will create one
// Usually applicationConfig will be initialized when system start
func GetApplicationConfig() *ApplicationConfig {
if applicationConfig == nil {
applicationConfig = &ApplicationConfig{}
}
return applicationConfig
}
// GetProviderConfig find the provider config
// if not found, create new one
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
return ProviderConfig{}
}
return *providerConfig
}
// GetConsumerConfig find the consumer config
// if not found, create new one
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
return ConsumerConfig{}
}
return *consumerConfig
}
......@@ -85,15 +85,6 @@ func SetConsumerConfig(c ConsumerConfig) {
consumerConfig = &c
}
// GetConsumerConfig ...
func GetConsumerConfig() ConsumerConfig {
if consumerConfig == nil {
logger.Warnf("consumerConfig is nil!")
return ConsumerConfig{}
}
return *consumerConfig
}
// ConsumerInit ...
func ConsumerInit(confConFile string) error {
if confConFile == "" {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
var (
defaultHistogramBucket = []float64{10, 50, 100, 200, 500, 1000, 10000}
)
// This is the config struct for all metrics implementation
type MetricConfig struct {
Reporters []string `yaml:"reporters" json:"reporters,omitempty"`
HistogramBucket []float64 `yaml:"histogram_bucket" json:"histogram_bucket,omitempty"`
}
// find the histogram bucket
// if it's empty, the default value will be return
func (mc *MetricConfig) GetHistogramBucket() []float64 {
if len(mc.HistogramBucket) == 0 {
mc.HistogramBucket = defaultHistogramBucket
}
return mc.HistogramBucket
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package config
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestGetMetricConfig(t *testing.T) {
empty := GetMetricConfig()
assert.NotNil(t, empty)
}
......@@ -76,15 +76,6 @@ func SetProviderConfig(p ProviderConfig) {
providerConfig = &p
}
// GetProviderConfig ...
func GetProviderConfig() ProviderConfig {
if providerConfig == nil {
logger.Warnf("providerConfig is nil!")
return ProviderConfig{}
}
return *providerConfig
}
// ProviderInit ...
func ProviderInit(confProFile string) error {
if len(confProFile) == 0 {
......
......@@ -67,6 +67,8 @@ type ServiceConfig struct {
TpsLimitRejectedHandler string `yaml:"tps.limit.rejected.handler" json:"tps.limit.rejected.handler,omitempty" property:"tps.limit.rejected.handler"`
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"`
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
unexported *atomic.Bool
exported *atomic.Bool
......@@ -220,6 +222,10 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.EXECUTE_LIMIT_KEY, c.ExecuteLimit)
urlMap.Set(constant.EXECUTE_REJECTED_EXECUTION_HANDLER_KEY, c.ExecuteLimitRejectedHandler)
// auth filter
urlMap.Set(constant.SERVICE_AUTH_KEY, c.Auth)
urlMap.Set(constant.PARAMTER_SIGNATURE_ENABLE_KEY, c.ParamSign)
for _, v := range c.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LOADBALANCE_KEY, v.Loadbalance)
......
......@@ -24,8 +24,8 @@ import (
)
import (
"github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
"github.com/samuel/go-zookeeper/zk"
)
import (
......@@ -175,7 +175,7 @@ func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &c.wg
}
func (c *zookeeperDynamicConfiguration) GetDone() chan struct{} {
func (c *zookeeperDynamicConfiguration) Done() chan struct{} {
return c.done
}
......
......@@ -24,7 +24,7 @@ import (
)
import (
"github.com/samuel/go-zookeeper/zk"
"github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
......
package filter
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
type AccessKeyPair struct {
AccessKey string `yaml:"accessKey" json:"accessKey,omitempty" property:"accessKey"`
SecretKey string `yaml:"secretKey" json:"secretKey,omitempty" property:"secretKey"`
ConsumerSide string `yaml:"consumerSide" json:"ConsumerSide,consumerSide" property:"consumerSide"`
ProviderSide string `yaml:"providerSide" json:"providerSide,omitempty" property:"providerSide"`
Creator string `yaml:"creator" json:"creator,omitempty" property:"creator"`
Options string `yaml:"options" json:"options,omitempty" property:"options"`
}
// AccessKeyStorage
// This SPI Extension support us to store our AccessKeyPair or load AccessKeyPair from other
// storage, such as filesystem.
type AccessKeyStorage interface {
GetAccessKeyPair(protocol.Invocation, *common.URL) *AccessKeyPair
}
package filter
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/protocol"
)
// Authenticator
type Authenticator interface {
// Sign
// give a sign to request
Sign(protocol.Invocation, *common.URL) error
// Authenticate
// verify the signature of the request is valid or not
Authenticate(protocol.Invocation, *common.URL) error
}
package auth
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
// DefaultAccesskeyStorage
// The default implementation of AccesskeyStorage
type DefaultAccesskeyStorage struct {
}
// GetAccessKeyPair
// get AccessKeyPair from url by the key "accessKeyId" and "secretAccessKey"
func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Invocation, url *common.URL) *filter.AccessKeyPair {
return &filter.AccessKeyPair{
AccessKey: url.GetParam(constant.ACCESS_KEY_ID_KEY, ""),
SecretKey: url.GetParam(constant.SECRET_ACCESS_KEY_KEY, ""),
}
}
func init() {
extension.SetAccesskeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, GetDefaultAccesskeyStorage)
}
func GetDefaultAccesskeyStorage() filter.AccessKeyStorage {
return &DefaultAccesskeyStorage{}
}
package auth
import (
"net/url"
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
invocation2 "github.com/apache/dubbo-go/protocol/invocation"
)
func TestDefaultAccesskeyStorage_GetAccesskeyPair(t *testing.T) {
url := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.SECRET_ACCESS_KEY_KEY, "skey"),
common.WithParamsValue(constant.ACCESS_KEY_ID_KEY, "akey"))
invocation := &invocation2.RPCInvocation{}
storage := GetDefaultAccesskeyStorage()
accesskeyPair := storage.GetAccessKeyPair(invocation, url)
assert.Equal(t, "skey", accesskeyPair.SecretKey)
assert.Equal(t, "akey", accesskeyPair.AccessKey)
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment