Skip to content
Snippets Groups Projects
Unverified Commit 46f93d89 authored by joeyzhouy's avatar joeyzhouy Committed by GitHub
Browse files

Merge pull request #3 from apache/develop

Develop
parents 92537bd7 ab77c924
No related branches found
No related tags found
No related merge requests found
Showing
with 375 additions and 272 deletions
......@@ -18,6 +18,7 @@
package cluster_impl
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"go.uber.org/atomic"
)
......@@ -27,7 +28,6 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -63,7 +63,7 @@ func (invoker *baseClusterInvoker) IsAvailable() bool {
//check invokers availables
func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, invocation protocol.Invocation) error {
if len(invokers) == 0 {
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return perrors.Errorf("Failed to invoke the method %v. No provider available for the service %v from "+
"registry %v on the consumer %v using the dubbo version %v .Please check if the providers have been started and registered.",
invocation.MethodName(), invoker.directory.GetUrl().SubURL.Key(), invoker.directory.GetUrl().String(), ip, constant.Version)
......@@ -75,7 +75,7 @@ func (invoker *baseClusterInvoker) checkInvokers(invokers []protocol.Invoker, in
//check cluster invoker is destroyed or not
func (invoker *baseClusterInvoker) checkWhetherDestroyed() error {
if invoker.destroyed.Load() {
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return perrors.Errorf("Rpc cluster invoker for %v on consumer %v use dubbo version %v is now destroyed! can not invoke any more. ",
invoker.directory.GetUrl().Service(), ip, constant.Version)
}
......
......@@ -22,6 +22,7 @@ import (
)
import (
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
......@@ -29,7 +30,6 @@ import (
"github.com/apache/dubbo-go/cluster"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -72,6 +72,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
invoked := []protocol.Invoker{}
providers := []string{}
var result protocol.Result
if retries > len(invokers) {
retries = len(invokers)
}
for i := 0; i <= retries; i++ {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
......@@ -87,6 +90,9 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
}
}
ivk := invoker.doSelect(loadbalance, invocation, invokers, invoked)
if ivk == nil {
continue
}
invoked = append(invoked, ivk)
//DO INVOKE
result = ivk.Invoke(invocation)
......@@ -97,7 +103,7 @@ func (invoker *failoverClusterInvoker) Invoke(invocation protocol.Invocation) pr
return result
}
}
ip, _ := utils.GetLocalIP()
ip, _ := gxnet.GetLocalIP()
return &protocol.RPCResult{Err: perrors.Errorf("Failed to invoke the method %v in the service %v. Tried %v times of "+
"the providers %v (%v/%v)from the registry %v on the consumer %v using the dubbo version %v. Last error is %v.",
methodName, invoker.GetUrl().Service(), retries, providers, len(providers), len(invokers), invoker.directory.GetUrl(), ip, constant.Version, result.Error().Error(),
......
......@@ -24,7 +24,8 @@ import (
)
import (
"github.com/dubbogo/gost/container"
"github.com/dubbogo/gost/container/gxset"
gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
......@@ -32,7 +33,6 @@ import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -126,7 +126,7 @@ func (c *ConditionRouter) Route(invokers []protocol.Invoker, url common.URL, inv
if len(c.ThenCondition) == 0 {
return result
}
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
for _, invoker := range invokers {
isMatchThen, err := c.MatchThen(invoker.GetUrl(), url)
if err != nil {
......@@ -157,7 +157,7 @@ func parseRule(rule string) (map[string]MatchPair, error) {
return condition, nil
}
var pair MatchPair
values := container.NewSet()
values := gxset.NewSet()
reg := regexp.MustCompile(`([&!=,]*)\s*([^&!=,\s]+)`)
var startIndex = 0
if indexTuple := reg.FindIndex([]byte(rule)); len(indexTuple) > 0 {
......@@ -170,8 +170,8 @@ func parseRule(rule string) (map[string]MatchPair, error) {
switch separator {
case "":
pair = MatchPair{
Matches: container.NewSet(),
Mismatches: container.NewSet(),
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
case "&":
......@@ -179,8 +179,8 @@ func parseRule(rule string) (map[string]MatchPair, error) {
pair = r
} else {
pair = MatchPair{
Matches: container.NewSet(),
Mismatches: container.NewSet(),
Matches: gxset.NewSet(),
Mismatches: gxset.NewSet(),
}
condition[content] = pair
}
......@@ -257,8 +257,8 @@ func MatchCondition(pairs map[string]MatchPair, url *common.URL, param *common.U
}
type MatchPair struct {
Matches *container.HashSet
Mismatches *container.HashSet
Matches *gxset.HashSet
Mismatches *gxset.HashSet
}
func (pair MatchPair) isMatch(value string, param *common.URL) bool {
......
......@@ -26,6 +26,7 @@ import (
)
import (
"github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
......@@ -33,7 +34,6 @@ import (
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
......@@ -146,7 +146,8 @@ func TestRoute_matchWhen(t *testing.T) {
}
func TestRoute_matchFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
t.Logf("The local ip is %s", localIP)
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService?default.serialization=fastjson")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -204,7 +205,7 @@ func TestRoute_methodRoute(t *testing.T) {
func TestRoute_ReturnFalse(t *testing.T) {
url, _ := common.NewURL(context.TODO(), "")
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => false"))
......@@ -215,7 +216,7 @@ func TestRoute_ReturnFalse(t *testing.T) {
}
func TestRoute_ReturnEmpty(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url, _ := common.NewURL(context.TODO(), "")
invokers := []protocol.Invoker{NewMockInvoker(url, 1), NewMockInvoker(url, 2), NewMockInvoker(url, 3)}
inv := &invocation.RPCInvocation{}
......@@ -227,7 +228,7 @@ func TestRoute_ReturnEmpty(t *testing.T) {
}
func TestRoute_ReturnAll(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
invokers := []protocol.Invoker{&MockInvoker{}, &MockInvoker{}, &MockInvoker{}}
inv := &invocation.RPCInvocation{}
rule := base64.URLEncoding.EncodeToString([]byte("host = " + localIP + " => " + " host = " + localIP))
......@@ -238,7 +239,7 @@ func TestRoute_ReturnAll(t *testing.T) {
}
func TestRoute_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -257,7 +258,7 @@ func TestRoute_HostFilter(t *testing.T) {
}
func TestRoute_Empty_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -276,7 +277,7 @@ func TestRoute_Empty_HostFilter(t *testing.T) {
}
func TestRoute_False_HostFilter(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -295,7 +296,7 @@ func TestRoute_False_HostFilter(t *testing.T) {
}
func TestRoute_Placeholder(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -314,7 +315,7 @@ func TestRoute_Placeholder(t *testing.T) {
}
func TestRoute_NoForce(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......@@ -331,7 +332,7 @@ func TestRoute_NoForce(t *testing.T) {
}
func TestRoute_Force(t *testing.T) {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
url1, _ := common.NewURL(context.TODO(), "dubbo://10.20.3.3:20880/com.foo.BarService")
url2, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
url3, _ := common.NewURL(context.TODO(), fmt.Sprintf("dubbo://%s:20880/com.foo.BarService", localIP))
......
......@@ -46,7 +46,7 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
DEFAULT_SERVICE_FILTERS = "echo,token"
DEFAULT_SERVICE_FILTERS = "echo,token,accesslog"
DEFAULT_REFERENCE_FILTERS = ""
GENERIC_REFERENCE_FILTERS = "generic"
GENERIC = "$invoke"
......
......@@ -38,6 +38,8 @@ const (
GENERIC_KEY = "generic"
CLASSIFIER_KEY = "classifier"
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
)
const (
......@@ -58,6 +60,7 @@ const (
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
ACCESS_LOG_KEY = "accesslog"
)
const (
......
......@@ -31,7 +31,7 @@ import (
)
import (
"github.com/dubbogo/gost/container"
"github.com/dubbogo/gost/container/gxset"
"github.com/jinzhu/copier"
perrors "github.com/pkg/errors"
"github.com/satori/go.uuid"
......@@ -447,7 +447,7 @@ func (c URL) GetMethodParam(method string, key string, d string) string {
return r
}
func (c *URL) RemoveParams(set *container.HashSet) {
func (c *URL) RemoveParams(set *gxset.HashSet) {
c.paramsLock.Lock()
defer c.paramsLock.Unlock()
for k := range set.Items {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"net"
"strings"
)
import (
perrors "github.com/pkg/errors"
)
var (
privateBlocks []*net.IPNet
)
func init() {
for _, b := range []string{"10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16"} {
if _, block, err := net.ParseCIDR(b); err == nil {
privateBlocks = append(privateBlocks, block)
}
}
}
func GetLocalIP() (string, error) {
faces, err := net.Interfaces()
if err != nil {
return "", perrors.WithStack(err)
}
var addr net.IP
for _, face := range faces {
if !isValidNetworkInterface(face) {
continue
}
addrs, err := face.Addrs()
if err != nil {
return "", perrors.WithStack(err)
}
if ipv4, ok := getValidIPv4(addrs); ok {
addr = ipv4
if isPrivateIP(ipv4) {
return ipv4.String(), nil
}
}
}
if addr == nil {
return "", perrors.Errorf("can not get local IP")
}
return addr.String(), nil
}
func isPrivateIP(ip net.IP) bool {
for _, priv := range privateBlocks {
if priv.Contains(ip) {
return true
}
}
return false
}
func getValidIPv4(addrs []net.Addr) (net.IP, bool) {
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
}
if ip == nil || ip.IsLoopback() {
continue
}
ip = ip.To4()
if ip == nil {
// not an valid ipv4 address
continue
}
return ip, true
}
return nil, false
}
func isValidNetworkInterface(face net.Interface) bool {
if face.Flags&net.FlagUp == 0 {
// interface down
return false
}
if face.Flags&net.FlagLoopback != 0 {
// loopback interface
return false
}
if strings.Contains(strings.ToLower(face.Name), "docker") {
return false
}
return true
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func TestGetLocalIP(t *testing.T) {
ip, err := GetLocalIP()
assert.NoError(t, err)
t.Log(ip)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"regexp"
)
func RegSplit(text string, delimeter string) []string {
reg := regexp.MustCompile(delimeter)
indexes := reg.FindAllStringIndex(text, -1)
laststart := 0
result := make([]string, len(indexes)+1)
for i, element := range indexes {
result[i] = text[laststart:element[0]]
laststart = element[1]
}
result[len(indexes)] = text[laststart:len(text)]
return result
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package utils
import (
"testing"
)
import (
"github.com/stretchr/testify/assert"
)
func Test_RegSplit(t *testing.T) {
strings := RegSplit("dubbo://123.1.2.1;jsonrpc://127.0.0.1;registry://3.2.1.3?registry=zookeeper", "\\s*[;]+\\s*")
assert.Len(t, strings, 3)
assert.Equal(t, "dubbo://123.1.2.1", strings[0])
assert.Equal(t, "jsonrpc://127.0.0.1", strings[1])
assert.Equal(t, "registry://3.2.1.3?registry=zookeeper", strings[2])
}
......@@ -37,8 +37,8 @@ func (c *ProtocolConfig) Prefix() string {
func loadProtocol(protocolsIds string, protocols map[string]*ProtocolConfig) []*ProtocolConfig {
returnProtocols := []*ProtocolConfig{}
for _, v := range strings.Split(protocolsIds, ",") {
for _, prot := range protocols {
if v == prot.Name {
for k, prot := range protocols {
if v == k {
returnProtocols = append(returnProtocols, prot)
}
}
......
......@@ -27,6 +27,7 @@ import (
import (
"github.com/creasty/defaults"
gxstrings "github.com/dubbogo/gost/strings"
)
import (
......@@ -35,7 +36,6 @@ import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/proxy"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/protocol"
)
......@@ -92,7 +92,7 @@ func (refconfig *ReferenceConfig) Refer() {
//1. user specified URL, could be peer-to-peer address, or register center's address.
if refconfig.Url != "" {
urlStrings := utils.RegSplit(refconfig.Url, "\\s*[;]+\\s*")
urlStrings := gxstrings.RegSplit(refconfig.Url, "\\s*[;]+\\s*")
for _, urlStr := range urlStrings {
serviceUrl, err := common.NewURL(context.Background(), urlStr)
if err != nil {
......
......@@ -58,6 +58,7 @@ type ServiceConfig struct {
Retries string `yaml:"retries" json:"retries,omitempty" property:"retries"`
Params map[string]string `yaml:"params" json:"params,omitempty" property:"params"`
Token string `yaml:"token" json:"token,omitempty" property:"token"`
AccessLog string `yaml:"accesslog" json:"accesslog,omitempty" property:"accesslog"`
unexported *atomic.Bool
exported *atomic.Bool
rpcService common.RPCService
......@@ -188,6 +189,9 @@ func (srvconfig *ServiceConfig) getUrlMap() url.Values {
//filter
urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, srvconfig.Filter, constant.DEFAULT_SERVICE_FILTERS))
//filter special config
urlMap.Set(constant.ACCESS_LOG_KEY, srvconfig.AccessLog)
for _, v := range srvconfig.Methods {
urlMap.Set("methods."+v.Name+"."+constant.LOADBALANCE_KEY, v.Loadbalance)
urlMap.Set("methods."+v.Name+"."+constant.RETRIES_KEY, v.Retries)
......
......@@ -21,14 +21,14 @@ import (
)
import (
"github.com/dubbogo/gost/container"
"github.com/dubbogo/gost/container/gxset"
gxnet "github.com/dubbogo/gost/net"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/utils"
"github.com/apache/dubbo-go/config_center"
)
......@@ -59,7 +59,7 @@ func (c *overrideConfigurator) Configure(url *common.URL) {
currentSide := url.GetParam(constant.SIDE_KEY, "")
configuratorSide := c.configuratorUrl.GetParam(constant.SIDE_KEY, "")
if currentSide == configuratorSide && common.DubboRole[common.CONSUMER] == currentSide && c.configuratorUrl.Port == "0" {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
c.configureIfMatch(localIP, url)
} else if currentSide == configuratorSide && common.DubboRole[common.PROVIDER] == currentSide && c.configuratorUrl.Port == url.Port {
c.configureIfMatch(url.Ip, url)
......@@ -78,7 +78,7 @@ func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
configApp := c.configuratorUrl.GetParam(constant.APPLICATION_KEY, c.configuratorUrl.Username)
currentApp := url.GetParam(constant.APPLICATION_KEY, url.Username)
if len(configApp) == 0 || constant.ANY_VALUE == configApp || configApp == currentApp {
conditionKeys := container.NewSet()
conditionKeys := gxset.NewSet()
conditionKeys.Add(constant.CATEGORY_KEY)
conditionKeys.Add(constant.CHECK_KEY)
conditionKeys.Add(constant.ENABLED_KEY)
......@@ -122,7 +122,7 @@ func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
// 1.If it is a consumer ip address, the intention is to control a specific consumer instance, it must takes effect at the consumer side, any provider received this override url should ignore;
// 2.If the ip is 0.0.0.0, this override url can be used on consumer, and also can be used on provider
if url.GetParam(constant.SIDE_KEY, "") == common.DubboRole[common.CONSUMER] {
localIP, _ := utils.GetLocalIP()
localIP, _ := gxnet.GetLocalIP()
c.configureIfMatch(localIP, url)
} else {
c.configureIfMatch(constant.ANYHOST_VALUE, url)
......
......@@ -25,7 +25,6 @@ registries :
username: ""
password: ""
services:
"UserProvider":
# 可以指定多个registry,使用逗号隔开;不指定默认向所有注册中心注册
......@@ -36,6 +35,10 @@ services:
loadbalance: "random"
warmup: "100"
cluster: "failover"
# config the path to access log file. The value can be "true" or "default" which means that the access log will be log in general log file (which config in log.yml)
# accesslog: "/path/to/your/accesslog/file"
methods:
- name: "GetUser"
retries: "1"
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"os"
"reflect"
"strings"
"time"
)
import (
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/filter"
"github.com/apache/dubbo-go/protocol"
)
const (
//used in URL.
FileDateFormat = "2006-01-02"
MessageDateLayout = "2006-01-02 15:04:05"
LogMaxBuffer = 5000
LogFileMode = 0600
// those fields are the data collected by this filter
Types = "types"
Arguments = "arguments"
)
func init() {
extension.SetFilter(constant.ACCESS_LOG_KEY, GetAccessLogFilter)
}
/*
* Although the access log filter is a default filter,
* you should config "accesslog" in service's config to tell the filter where store the access log.
* for example:
* "UserProvider":
* registry: "hangzhouzk"
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* accesslog: "/your/path/to/store/the/log/", # it should be the path of file.
*
* the value of "accesslog" can be "true" or "default" too.
* If the value is one of them, the access log will be record in log file which defined in log.yml
*/
type AccessLogFilter struct {
logChan chan AccessLogData
}
func (ef *AccessLogFilter) Invoke(invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
accessLog := invoker.GetUrl().GetParam(constant.ACCESS_LOG_KEY, "")
if len(accessLog) > 0 {
accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog}
ef.logIntoChannel(accessLogData)
}
return invoker.Invoke(invocation)
}
// it won't block the invocation
func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
select {
case ef.logChan <- accessLogData:
return
default:
logger.Warn("The channel is full and the access logIntoChannel data will be dropped")
return
}
}
func (ef *AccessLogFilter) buildAccessLogData(invoker protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
dataMap[constant.INTERFACE_KEY] = attachments[constant.INTERFACE_KEY]
dataMap[constant.METHOD_KEY] = invocation.MethodName()
dataMap[constant.VERSION_KEY] = attachments[constant.VERSION_KEY]
dataMap[constant.GROUP_KEY] = attachments[constant.GROUP_KEY]
dataMap[constant.TIMESTAMP_KEY] = time.Now().Format(MessageDateLayout)
dataMap[constant.LOCAL_ADDR], _ = attachments[constant.LOCAL_ADDR]
dataMap[constant.REMOTE_ADDR], _ = attachments[constant.REMOTE_ADDR]
if len(invocation.Arguments()) > 0 {
builder := strings.Builder{}
// todo(after the paramTypes were set to the invocation. we should change this implementation)
typeBuilder := strings.Builder{}
builder.WriteString(reflect.ValueOf(invocation.Arguments()[0]).String())
typeBuilder.WriteString(reflect.TypeOf(invocation.Arguments()[0]).Name())
for idx := 1; idx < len(invocation.Arguments()); idx++ {
arg := invocation.Arguments()[idx]
builder.WriteString(",")
builder.WriteString(reflect.ValueOf(arg).String())
typeBuilder.WriteString(",")
typeBuilder.WriteString(reflect.TypeOf(arg).Name())
}
dataMap[Arguments] = builder.String()
dataMap[Types] = typeBuilder.String()
}
return dataMap
}
func (ef *AccessLogFilter) OnResponse(result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
accessLog := data.accessLog
if isDefault(accessLog) {
logger.Info(data.toLogMessage())
return
}
logFile, err := ef.openLogFile(accessLog)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return
}
logger.Debugf("Append log to %s", accessLog)
message := data.toLogMessage()
message = message + "\n"
_, err = logFile.WriteString(message)
if err != nil {
logger.Warnf("Can not write the log into access log file: %s, %v", accessLog, err)
}
}
func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return nil, err
}
now := time.Now().Format(FileDateFormat)
fileInfo, err := logFile.Stat()
if err != nil {
logger.Warnf("Can not get the info of access log file: %s, %v", accessLog, err)
return nil, err
}
last := fileInfo.ModTime().Format(FileDateFormat)
if now != last {
err = os.Rename(fileInfo.Name(), fileInfo.Name()+"."+now)
if err != nil {
logger.Warnf("Can not rename access log file: %s, %v", fileInfo.Name(), err)
return nil, err
}
logFile, err = os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
}
return logFile, err
}
func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}
func GetAccessLogFilter() filter.Filter {
accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
}
}()
return accessLogFilter
}
type AccessLogData struct {
accessLog string
data map[string]string
}
func (ef *AccessLogData) toLogMessage() string {
builder := strings.Builder{}
builder.WriteString("[")
builder.WriteString(ef.data[constant.TIMESTAMP_KEY])
builder.WriteString("] ")
builder.WriteString(ef.data[constant.REMOTE_ADDR])
builder.WriteString(" -> ")
builder.WriteString(ef.data[constant.LOCAL_ADDR])
builder.WriteString(" - ")
if len(ef.data[constant.GROUP_KEY]) > 0 {
builder.WriteString(ef.data[constant.GROUP_KEY])
builder.WriteString("/")
}
builder.WriteString(ef.data[constant.INTERFACE_KEY])
if len(ef.data[constant.VERSION_KEY]) > 0 {
builder.WriteString(":")
builder.WriteString(ef.data[constant.VERSION_KEY])
}
builder.WriteString(" ")
builder.WriteString(ef.data[constant.METHOD_KEY])
builder.WriteString("(")
if len(ef.data[Types]) > 0 {
builder.WriteString(ef.data[Types])
}
builder.WriteString(") ")
if len(ef.data[Arguments]) > 0 {
builder.WriteString(ef.data[Arguments])
}
return builder.String()
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package impl
import (
"context"
"testing"
)
import (
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
"github.com/apache/dubbo-go/common/constant"
"github.com/apache/dubbo-go/protocol"
"github.com/apache/dubbo-go/protocol/invocation"
)
func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(context.Background(),
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+
"&cluster=failover&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+
"loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+
"BDTService&organization=ikurento.com&owner=ZX&registry.role=3&retries=&"+
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
attach := make(map[string]string, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
accessLogFilter := GetAccessLogFilter()
result := accessLogFilter.Invoke(invoker, inv)
assert.Nil(t, result.Error())
}
func TestAccessLogFilter_Invoke_Default_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(context.Background(),
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider"+
"&cluster=failover&accesslog=true&environment=dev&group=&interface=com.ikurento.user.UserProvider&loadbalance=random&methods.GetUser."+
"loadbalance=random&methods.GetUser.retries=1&methods.GetUser.weight=0&module=dubbogo+user-info+server&name="+
"BDTService&organization=ikurento.com&owner=ZX&registry.role=3&retries=&"+
"service.filter=echo%2Ctoken%2Caccesslog&timestamp=1569153406&token=934804bf-b007-4174-94eb-96e3e1d60cc7&version=&warmup=100")
invoker := protocol.NewBaseInvoker(url)
attach := make(map[string]string, 10)
attach[constant.VERSION_KEY] = "1.0"
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
accessLogFilter := GetAccessLogFilter()
result := accessLogFilter.Invoke(invoker, inv)
assert.Nil(t, result.Error())
}
func TestAccessLogFilter_OnResponse(t *testing.T) {
result := &protocol.RPCResult{}
accessLogFilter := GetAccessLogFilter()
response := accessLogFilter.OnResponse(result, nil, nil)
assert.Equal(t, result, response)
}
......@@ -4,7 +4,7 @@ require (
github.com/Workiva/go-datastructures v1.0.50
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e // indirect
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5
github.com/buger/jsonparser v0.0.0-20181115193947-bf1c66bbce23 // indirect
github.com/coreos/bbolt v1.3.3 // indirect
github.com/coreos/etcd v3.3.13+incompatible
......@@ -12,8 +12,8 @@ require (
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.2.2
github.com/dubbogo/gost v1.1.1
github.com/dubbogo/getty v1.3.0
github.com/dubbogo/gost v1.3.0
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
......
......@@ -35,8 +35,10 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e h1:MSuLXx/mveDbpDNhVrcWTMeV4lbYWKcyO4rH+jAxmX0=
github.com/aliyun/alibaba-cloud-sdk-go v0.0.0-20190802083043-4cd0c391755e/go.mod h1:myCDvQSzCW+wB1WAlocEru4wMGJxy+vlxHdhegi1CDQ=
github.com/aliyun/aliyun-oss-go-sdk v0.0.0-20190307165228-86c17b95fcd5/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22 h1:Ku+3LFRYVelgo/INS9893QOUeIiKNeNKzK3CzDcqt/4=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190909140437-80cbb25cbb22/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190921023740-335b8c601359 h1:ti5HOgxW/aKonsBe4Sj/W3+RMq4Jxl/EAAblROneggg=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190921023740-335b8c601359/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5 h1:p85EqnwOfcqqayW7OPREn0YJxIPIuEmuBJPezzhtO/M=
github.com/apache/dubbo-go-hessian2 v1.2.5-0.20190923055845-e3dd5d131df5/go.mod h1:LWnndnrFXZmJLAzoyNAPNHSIJ1KOHVkTSsHgC3YYWlo=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e h1:QEF07wC0T1rKkctt1RINW/+RMTVmiwxETico2l3gxJA=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
......@@ -102,10 +104,12 @@ github.com/docker/go-connections v0.3.0 h1:3lOnM9cSzgGwx8VfK/NGOW5fLQ0GjIlCkaktF
github.com/docker/go-connections v0.3.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk=
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dubbogo/getty v1.2.2 h1:qDC9WXjxcs5NPvWZz2ruVKBKr2r1Jjm6i0Sq//CQwbE=
github.com/dubbogo/getty v1.2.2/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU=
github.com/dubbogo/getty v1.3.0 h1:GImOCANdts7dlRqi9GMVsZJnfst9EPyjTVTR1AesOD8=
github.com/dubbogo/getty v1.3.0/go.mod h1:K4b3MkGLf7T+lMgQNFgpg0dI1Wvv1PTisFs1Psf86kU=
github.com/dubbogo/gost v1.1.1 h1:JCM7vx5edPIjDA5ovJTuzEEXuw2t7xLyrlgi2mi5jHI=
github.com/dubbogo/gost v1.1.1/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/dubbogo/gost v1.3.0 h1:n90mIUWCPD69BqW8wJ43NDy0RgNxx02aAG4QJcJ785U=
github.com/dubbogo/gost v1.3.0/go.mod h1:R7wZm1DrmrKGr50mBZVcg6C9ekG8aL5hP+sgWcIDwQg=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74 h1:2MIhn2R6oXQbgW5yHfS+d6YqyMfXiu2L55rFZC4UD/M=
github.com/duosecurity/duo_api_golang v0.0.0-20190308151101-6c680f768e74/go.mod h1:UqXY1lYT/ERa4OEAywUqdok1T4RCRdArkhic1Opuavo=
github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y=
......@@ -184,7 +188,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGa
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:1yOKgt0XYKUg1HOKunGOSt2ocU4bxLCjmIHt0vRtVHM=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible h1:AQwinXlbQR2HvPjQZOmDhRqsv5mZf+Jb1RnSLxcqZcI=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
......@@ -517,6 +521,7 @@ golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGm
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 h1:5Beo0mZN8dRzgrMMkDp0jc8YXQKx9DiJ2k1dkvGsn5A=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
google.golang.org/api v0.0.0-20180829000535-087779f1d2c9 h1:z1TeLUmxf9ws9KLICfmX+KGXTs+rjm+aGWzfsv7MZ9w=
google.golang.org/api v0.0.0-20180829000535-087779f1d2c9/go.mod h1:4mhQ8q/RsB7i+udVvVy5NUi08OU8ZlA0gRVgrF7VFY0=
......@@ -553,7 +558,7 @@ gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bl
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:y0IMTfclpMdsdIbr6uwmJn5/WZ7vFuObxDMdrylFM3A=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment