diff --git a/common/constant/default.go b/common/constant/default.go index 992fc32748bb4fc7777cffecc9137663c681c3f7..61abff6d4b7e9f8cfd4e2c8c97eee418dd96b062 100644 --- a/common/constant/default.go +++ b/common/constant/default.go @@ -41,6 +41,8 @@ const ( DEFAULT_FAILBACK_TIMES = "3" DEFAULT_FAILBACK_TIMES_INT = 3 DEFAULT_FAILBACK_TASKS = 100 + DEFAULT_REST_CLIENT = "resty" + DEFAULT_REST_SERVER = "go-restful" ) const ( diff --git a/common/extension/rest_client.go b/common/extension/rest_client.go new file mode 100644 index 0000000000000000000000000000000000000000..7870ad9e7699c5a1bcfe42a396e45cb280892103 --- /dev/null +++ b/common/extension/rest_client.go @@ -0,0 +1,20 @@ +package extension + +import ( + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +var ( + restClients = make(map[string]func(restOptions *rest_interface.RestOptions) rest_interface.RestClient) +) + +func SetRestClient(name string, fun func(restOptions *rest_interface.RestOptions) rest_interface.RestClient) { + restClients[name] = fun +} + +func GetNewRestClient(name string, restOptions *rest_interface.RestOptions) rest_interface.RestClient { + if restClients[name] == nil { + panic("restClient for " + name + " is not existing, make sure you have import the package.") + } + return restClients[name](restOptions) +} diff --git a/common/extension/rest_config_reader.go b/common/extension/rest_config_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..5660ef29e17d2369a0da60caee67e6b56880d976 --- /dev/null +++ b/common/extension/rest_config_reader.go @@ -0,0 +1,24 @@ +package extension + +import ( + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +var ( + restConfigReaders = make(map[string]func() rest_interface.RestConfigReader) +) + +func SetRestConfigReader(name string, fun func() rest_interface.RestConfigReader) { + restConfigReaders[name] = fun +} + +func GetSingletonRestConfigReader(name string) rest_interface.RestConfigReader { + if name == "" { + name = "default" + } + if restConfigReaders[name] == nil { + panic("restConfigReaders for " + name + " is not existing, make sure you have import the package.") + } + return restConfigReaders[name]() + +} diff --git a/common/extension/rest_server.go b/common/extension/rest_server.go new file mode 100644 index 0000000000000000000000000000000000000000..3ca1d44335765849008c6192910fd67dec2a87bc --- /dev/null +++ b/common/extension/rest_server.go @@ -0,0 +1,20 @@ +package extension + +import ( + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +var ( + restServers = make(map[string]func() rest_interface.RestServer) +) + +func SetRestServer(name string, fun func() rest_interface.RestServer) { + restServers[name] = fun +} + +func GetNewRestServer(name string) rest_interface.RestServer { + if restServers[name] == nil { + panic("restServer for " + name + " is not existing, make sure you have import the package.") + } + return restServers[name]() +} diff --git a/config/consumer_config.go b/config/consumer_config.go index 7756f3b51c0f46a19687affb4dc6eadf9ef711c7..c3e953cce4dc30e279e1156d7b5e823ba08950d4 100644 --- a/config/consumer_config.go +++ b/config/consumer_config.go @@ -61,6 +61,7 @@ type ConsumerConfig struct { ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf"` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + RestConfigType string `default:"default" yaml:"rest_config_type" json:"rest_config_type,omitempty" property:"rest_config_type"` } // UnmarshalYAML ... diff --git a/config/provider_config.go b/config/provider_config.go index 0bfa78647b58d9b6eb961adc5485207faffe1e1e..a751a8f65db6764aeb02d225cb163e72a55eae2f 100644 --- a/config/provider_config.go +++ b/config/provider_config.go @@ -52,6 +52,7 @@ type ProviderConfig struct { ProtocolConf interface{} `yaml:"protocol_conf" json:"protocol_conf,omitempty" property:"protocol_conf" ` FilterConf interface{} `yaml:"filter_conf" json:"filter_conf,omitempty" property:"filter_conf" ` ShutdownConfig *ShutdownConfig `yaml:"shutdown_conf" json:"shutdown_conf,omitempty" property:"shutdown_conf" ` + RestConfigType string `default:"default" yaml:"rest_config_type" json:"rest_config_type,omitempty" property:"rest_config_type"` } // UnmarshalYAML ... diff --git a/go.mod b/go.mod index db6dc92c63176334f6dfa0436889ffab6f3c9c53..cb94f391e2b672c6849318a43140671e4af611e2 100644 --- a/go.mod +++ b/go.mod @@ -14,8 +14,10 @@ require ( github.com/creasty/defaults v1.3.0 github.com/dubbogo/getty v1.3.2 github.com/dubbogo/gost v1.5.2 + github.com/emicklei/go-restful/v3 v3.0.0 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-errors/errors v1.0.1 // indirect + github.com/go-resty/resty/v2 v2.1.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 github.com/golang/protobuf v1.3.2 @@ -52,6 +54,7 @@ require ( go.etcd.io/etcd v3.3.13+incompatible go.uber.org/atomic v1.4.0 go.uber.org/zap v1.10.0 + golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 google.golang.org/grpc v1.22.1 gopkg.in/yaml.v2 v2.2.2 ) diff --git a/go.sum b/go.sum index f215a81b209579c0bb5de0123153b10b7a36767b..5e1b342d33484f7ea5c2accbe41400a6219fb349 100644 --- a/go.sum +++ b/go.sum @@ -118,6 +118,8 @@ github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFP github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0 h1:ZoRgc53qJCfSLimXqJDrmBhnt5GChDsExMCK7t48o0Y= github.com/elazarl/go-bindata-assetfs v0.0.0-20160803192304-e1a2a7ec64b0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g= +github.com/emicklei/go-restful/v3 v3.0.0 h1:Duxxa4x0WIHW3bYEDmoAPNjmy8Rbqn+utcF74dlF/G8= +github.com/emicklei/go-restful/v3 v3.0.0/go.mod h1:6n3XBCmQQb25CM2LCACGz8ukIrRry+4bhvbpWn3mrbc= github.com/envoyproxy/go-control-plane v0.8.0 h1:uE6Fp4fOcAJdc1wTQXLJ+SYistkbG1dNoi6Zs1+Ybvk= github.com/envoyproxy/go-control-plane v0.8.0/go.mod h1:GSSbY9P1neVhdY7G4wu+IK1rk/dqhiCC/4ExuWJZVuk= github.com/envoyproxy/protoc-gen-validate v0.0.14 h1:YBW6/cKy9prEGRYLnaGa4IDhzxZhRCtKsax8srGKDnM= @@ -131,6 +133,8 @@ github.com/fatih/structs v0.0.0-20180123065059-ebf56d35bba7/go.mod h1:9NiDSp5zOc github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/go-errors/errors v1.0.1 h1:LUHzmkK3GUKUrL/1gfBUxAHzcev3apQlezX/+O7ma6w= github.com/go-errors/errors v1.0.1/go.mod h1:f4zRHt4oKfwPJE5k8C9vpYG+aDHdBFUsgrm6/TyX73Q= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= @@ -142,6 +146,12 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-ole/go-ole v1.2.1 h1:2lOsA72HgjxAuMlKpFiCbHTvu44PIVkZ5hqm3RSdI/E= github.com/go-ole/go-ole v1.2.1/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8= +github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= +github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= +github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= +github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= +github.com/go-resty/resty/v2 v2.1.0 h1:Z6IefCpUMfnvItVJaJXWv/pMiiD11So35QgwEELsldE= +github.com/go-resty/resty/v2 v2.1.0/go.mod h1:dZGr0i9PLlaaTD4H/hoZIDjQ+r6xq8mgbRzHZf7f2J8= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4 h1:1LlmVz15APoKz9dnm5j2ePptburJlwEH+/v/pUuoxck= github.com/go-sql-driver/mysql v0.0.0-20180618115901-749ddf1598b4/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= @@ -321,6 +331,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= +github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570 h1:0iQektZGS248WXmGIYOwRXSQhD4qn3icjMpuxwO7qlo= github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE= github.com/lestrrat/go-file-rotatelogs v0.0.0-20180223000712-d3151e2a480f h1:sgUSP4zdTUZYZgAGGtN5Lxk92rK+JUFOwf+FT99EEI4= @@ -336,6 +348,8 @@ github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRU github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI= github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg= +github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/miekg/dns v1.0.14 h1:9jZdLNd/P4+SfEJ0TNyxYpsK8N4GtfylBLqtbYN1sbA= @@ -480,7 +494,6 @@ github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3 h1:kF/7m/ZU+0D github.com/toolkits/concurrent v0.0.0-20150624120057-a4371d70e3e3/go.mod h1:QDlpd3qS71vYtakd2hmdpqhJ9nwv6mD6A30bQ1BPBFE= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926 h1:G3dpKMzFDjgEh2q1Z7zUUtKa8ViPtH+ocF0bE0g00O8= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/uber/jaeger-client-go v2.17.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk= github.com/vmware/govmomi v0.18.0 h1:f7QxSmP7meCtoAmiKZogvVbLInT+CZx6Px6K5rYsJZo= github.com/vmware/govmomi v0.18.0/go.mod h1:URlwyTFZX72RmxtxuaFL2Uj3fD1JTvZdx59bHWk6aFU= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 h1:eY9dn8+vbi4tKz5Qo6v2eYzo7kUS51QINcR5jNpbZS8= @@ -516,6 +529,8 @@ golang.org/x/net v0.0.0-20190320064053-1272bf9dcd53/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980 h1:dfGZHvZk057jK2MCeWus/TowKpJ8y4AmooUzdBSR9GU= golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7 h1:rTIdg5QFRR7XCaK4LCjBiPbx8j4DQRpdYMnGn/bJUEU= +golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/oauth2 v0.0.0-20170807180024-9a379c6b3e95/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -536,6 +551,8 @@ golang.org/x/sys v0.0.0-20190508220229-2d0786266e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3 h1:4y9KwBHBgBNwDbtu44R5o1fdOCQUEXhbk/P4A9WmJq0= golang.org/x/sys v0.0.0-20190801041406-cbf593c0f2f3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= @@ -572,6 +589,10 @@ gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= +gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= +gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= +gopkg.in/go-playground/validator.v9 v9.29.1 h1:SvGtYmN60a5CVKTOzMSyfzWDeZRxRuGvRQyEAKbw1xc= +gopkg.in/go-playground/validator.v9 v9.29.1/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= diff --git a/protocol/rest/rest_client/resty_client.go b/protocol/rest/rest_client/resty_client.go new file mode 100644 index 0000000000000000000000000000000000000000..cdfddcfbe5fd7e1ab239d774098c162bc1715f2e --- /dev/null +++ b/protocol/rest/rest_client/resty_client.go @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_client + +import ( + "context" + "net" + "net/http" + "path" + "time" +) + +import ( + "github.com/go-resty/resty/v2" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +func init() { + extension.SetRestClient(constant.DEFAULT_REST_CLIENT, GetRestyClient) +} + +type RestyClient struct { + client *resty.Client +} + +func NewRestyClient(restOption *rest_interface.RestOptions) *RestyClient { + client := resty.New() + client.SetTransport( + &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + c, err := net.DialTimeout(network, addr, restOption.ConnectTimeout) + if err != nil { + return nil, err + } + err = c.SetDeadline(time.Now().Add(restOption.RequestTimeout)) + if err != nil { + return nil, err + } + return c, nil + }, + }) + return &RestyClient{ + client: client, + } +} + +func (rc *RestyClient) Do(restRequest *rest_interface.RestRequest, res interface{}) error { + _, err := rc.client.R(). + SetHeader("Content-Type", restRequest.Consumes). + SetHeader("Accept", restRequest.Produces). + SetPathParams(restRequest.PathParams). + SetQueryParams(restRequest.QueryParams). + SetBody(restRequest.Body). + SetResult(res). + SetHeaders(restRequest.Headers). + Execute(restRequest.Method, "http://"+path.Join(restRequest.Location, restRequest.Path)) + return err +} + +func GetRestyClient(restOptions *rest_interface.RestOptions) rest_interface.RestClient { + return NewRestyClient(restOptions) +} diff --git a/protocol/rest/rest_config_initializer.go b/protocol/rest/rest_config_initializer.go new file mode 100644 index 0000000000000000000000000000000000000000..212e2ce596a607acb985ff0fd06282a64d1bd50c --- /dev/null +++ b/protocol/rest/rest_config_initializer.go @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "strconv" + "strings" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/config" + _ "github.com/apache/dubbo-go/protocol/rest/rest_config_reader" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +var ( + restConsumerConfig *rest_interface.RestConsumerConfig + restProviderConfig *rest_interface.RestProviderConfig + restConsumerServiceConfigMap map[string]*rest_interface.RestConfig + restProviderServiceConfigMap map[string]*rest_interface.RestConfig +) + +func init() { + initConsumerRestConfig() + initProviderRestConfig() +} + +func initConsumerRestConfig() { + consumerConfigType := config.GetConsumerConfig().RestConfigType + consumerConfigReader := extension.GetSingletonRestConfigReader(consumerConfigType) + restConsumerConfig = consumerConfigReader.ReadConsumerConfig() + if restConsumerConfig == nil { + return + } + restConsumerServiceConfigMap = make(map[string]*rest_interface.RestConfig, len(restConsumerConfig.RestConfigMap)) + for _, rc := range restConsumerConfig.RestConfigMap { + rc.Client = getNotEmptyStr(rc.Client, restConsumerConfig.Client, constant.DEFAULT_REST_CLIENT) + rc.RestMethodConfigsMap = initMethodConfigMap(rc, restConsumerConfig.Consumes, restConsumerConfig.Produces) + restConsumerServiceConfigMap[rc.InterfaceName] = rc + } +} + +func initProviderRestConfig() { + providerConfigType := config.GetProviderConfig().RestConfigType + providerConfigReader := extension.GetSingletonRestConfigReader(providerConfigType) + restProviderConfig = providerConfigReader.ReadProviderConfig() + if restProviderConfig == nil { + return + } + restProviderServiceConfigMap = make(map[string]*rest_interface.RestConfig, len(restProviderConfig.RestConfigMap)) + for _, rc := range restProviderConfig.RestConfigMap { + rc.Server = getNotEmptyStr(rc.Server, restProviderConfig.Server, constant.DEFAULT_REST_SERVER) + rc.RestMethodConfigsMap = initMethodConfigMap(rc, restProviderConfig.Consumes, restProviderConfig.Produces) + restProviderServiceConfigMap[rc.InterfaceName] = rc + } +} + +func initMethodConfigMap(rc *rest_interface.RestConfig, consumes string, produces string) map[string]*rest_interface.RestMethodConfig { + mcm := make(map[string]*rest_interface.RestMethodConfig, len(rc.RestMethodConfigs)) + for _, mc := range rc.RestMethodConfigs { + mc.InterfaceName = rc.InterfaceName + mc.Path = rc.Path + mc.Path + mc.Consumes = getNotEmptyStr(mc.Consumes, rc.Consumes, consumes) + mc.Produces = getNotEmptyStr(mc.Produces, rc.Produces, produces) + mc.MethodType = getNotEmptyStr(mc.MethodType, rc.MethodType) + mc = transformMethodConfig(mc) + mcm[mc.MethodName] = mc + } + return mcm +} + +func getNotEmptyStr(args ...string) string { + var r string + for _, t := range args { + if len(t) > 0 { + r = t + break + } + } + return r +} + +func transformMethodConfig(methodConfig *rest_interface.RestMethodConfig) *rest_interface.RestMethodConfig { + if len(methodConfig.PathParamsMap) == 0 && len(methodConfig.PathParams) > 0 { + paramsMap, err := parseParamsString2Map(methodConfig.PathParams) + if err != nil { + logger.Warnf("[Rest Config] Path Param parse error:%v", err) + } else { + methodConfig.PathParamsMap = paramsMap + } + } + if len(methodConfig.QueryParamsMap) == 0 && len(methodConfig.QueryParams) > 0 { + paramsMap, err := parseParamsString2Map(methodConfig.QueryParams) + if err != nil { + logger.Warnf("[Rest Config] Argument Param parse error:%v", err) + } else { + methodConfig.QueryParamsMap = paramsMap + } + } + if len(methodConfig.HeadersMap) == 0 && len(methodConfig.Headers) > 0 { + headersMap, err := parseParamsString2Map(methodConfig.Headers) + if err != nil { + logger.Warnf("[Rest Config] Argument Param parse error:%v", err) + } else { + methodConfig.HeadersMap = headersMap + } + } + return methodConfig +} + +func parseParamsString2Map(params string) (map[int]string, error) { + m := make(map[int]string) + for _, p := range strings.Split(params, ",") { + pa := strings.Split(p, ":") + key, err := strconv.Atoi(pa[0]) + if err != nil { + return nil, err + } + m[key] = pa[1] + } + return m, nil +} + +func GetRestConsumerServiceConfig(service string) *rest_interface.RestConfig { + return restConsumerServiceConfigMap[service] +} + +func GetRestProviderServiceConfig(service string) *rest_interface.RestConfig { + return restProviderServiceConfigMap[service] +} + +func SetRestConsumerServiceConfigMap(configMap map[string]*rest_interface.RestConfig) { + restConsumerServiceConfigMap = configMap +} + +func SetRestProviderServiceConfigMap(configMap map[string]*rest_interface.RestConfig) { + restProviderServiceConfigMap = configMap +} diff --git a/protocol/rest/rest_config_initializer_test.go b/protocol/rest/rest_config_initializer_test.go new file mode 100644 index 0000000000000000000000000000000000000000..ab17d0118ecd73c6980c50ec2e9c741fb1a7df16 --- /dev/null +++ b/protocol/rest/rest_config_initializer_test.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "os" + "testing" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/stretchr/testify/assert" +) + +func TestGetRestConsumerServiceConfig(t *testing.T) { + err := os.Setenv(constant.CONF_CONSUMER_FILE_PATH, "./rest_config_reader/testdata/consumer_config.yml") + assert.NoError(t, err) + initConsumerRestConfig() + serviceConfig := GetRestConsumerServiceConfig("com.ikurento.user.UserProvider") + assert.NotEmpty(t, serviceConfig) +} + +func TestGetRestProviderServiceConfig(t *testing.T) { + err := os.Setenv(constant.CONF_PROVIDER_FILE_PATH, "./rest_config_reader/testdata/provider_config.yml") + assert.NoError(t, err) + initProviderRestConfig() + serviceConfig := GetRestProviderServiceConfig("com.ikurento.user.UserProvider") + assert.NotEmpty(t, serviceConfig) +} diff --git a/protocol/rest/rest_config_reader/default_config_reader.go b/protocol/rest/rest_config_reader/default_config_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..3eb29af8f31a50000ac6882806a12279c34dc24a --- /dev/null +++ b/protocol/rest/rest_config_reader/default_config_reader.go @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_config_reader + +import ( + "io/ioutil" + "os" + "path" +) + +import ( + perrors "github.com/pkg/errors" + "gopkg.in/yaml.v2" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +var ( + defaultConfigReader *DefaultConfigReader +) + +func init() { + extension.SetRestConfigReader(constant.DEFAULT_KEY, GetDefaultConfigReader) +} + +type DefaultConfigReader struct { +} + +func NewDefaultConfigReader() *DefaultConfigReader { + return &DefaultConfigReader{} +} + +func (dcr *DefaultConfigReader) ReadConsumerConfig() *rest_interface.RestConsumerConfig { + confConFile := os.Getenv(constant.CONF_CONSUMER_FILE_PATH) + if confConFile == "" { + logger.Warnf("rest consumer configure(consumer) file name is nil") + return nil + } + if path.Ext(confConFile) != ".yml" { + logger.Warnf("rest consumer configure file name{%v} suffix must be .yml", confConFile) + return nil + } + confFileStream, err := ioutil.ReadFile(confConFile) + if err != nil { + logger.Warnf("ioutil.ReadFile(file:%s) = error:%v", confConFile, perrors.WithStack(err)) + return nil + } + restConsumerConfig := &rest_interface.RestConsumerConfig{} + err = yaml.Unmarshal(confFileStream, restConsumerConfig) + if err != nil { + logger.Warnf("yaml.Unmarshal() = error:%v", perrors.WithStack(err)) + return nil + } + return restConsumerConfig +} + +func (dcr *DefaultConfigReader) ReadProviderConfig() *rest_interface.RestProviderConfig { + confProFile := os.Getenv(constant.CONF_PROVIDER_FILE_PATH) + if len(confProFile) == 0 { + logger.Warnf("rest provider configure(provider) file name is nil") + return nil + } + + if path.Ext(confProFile) != ".yml" { + logger.Warnf("rest provider configure file name{%v} suffix must be .yml", confProFile) + return nil + } + confFileStream, err := ioutil.ReadFile(confProFile) + if err != nil { + logger.Warnf("ioutil.ReadFile(file:%s) = error:%v", confProFile, perrors.WithStack(err)) + return nil + } + restProviderConfig := &rest_interface.RestProviderConfig{} + err = yaml.Unmarshal(confFileStream, restProviderConfig) + if err != nil { + logger.Warnf("yaml.Unmarshal() = error:%v", perrors.WithStack(err)) + return nil + } + + return restProviderConfig +} + +func GetDefaultConfigReader() rest_interface.RestConfigReader { + if defaultConfigReader == nil { + defaultConfigReader = NewDefaultConfigReader() + } + return defaultConfigReader +} diff --git a/protocol/rest/rest_config_reader/default_config_reader_test.go b/protocol/rest/rest_config_reader/default_config_reader_test.go new file mode 100644 index 0000000000000000000000000000000000000000..5990c5314e1531f89889ee115f514ed03cff3fc9 --- /dev/null +++ b/protocol/rest/rest_config_reader/default_config_reader_test.go @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_config_reader + +import ( + "os" + "testing" +) + +import ( + "github.com/apache/dubbo-go/common/constant" + "github.com/stretchr/testify/assert" +) + +func TestDefaultConfigReader_ReadConsumerConfig(t *testing.T) { + err := os.Setenv(constant.CONF_CONSUMER_FILE_PATH, "./testdata/consumer_config.yml") + assert.NoError(t, err) + reader := GetDefaultConfigReader() + config := reader.ReadConsumerConfig() + assert.NotEmpty(t, config) +} + +func TestDefaultConfigReader_ReadProviderConfig(t *testing.T) { + err := os.Setenv(constant.CONF_PROVIDER_FILE_PATH, "./testdata/provider_config.yml") + assert.NoError(t, err) + reader := GetDefaultConfigReader() + config := reader.ReadProviderConfig() + assert.NotEmpty(t, config) +} diff --git a/protocol/rest/rest_config_reader/testdata/consumer_config.yml b/protocol/rest/rest_config_reader/testdata/consumer_config.yml new file mode 100644 index 0000000000000000000000000000000000000000..c484a2bafd3d2850f23195111adb6263e30d9841 --- /dev/null +++ b/protocol/rest/rest_config_reader/testdata/consumer_config.yml @@ -0,0 +1,100 @@ +# dubbo client yaml configure file + +filter: "" + +# client +request_timeout : "100ms" +# connect timeout +connect_timeout : "100ms" +check: true +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info client" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + +references: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + protocol : "dubbo" + version: "1.0" + group: "as" + interface : "com.ikurento.user.UserProvider" + url: "dubbo://127.0.0.1:20000/UserProvider" + cluster: "failover" + timeout: "3s" + methods : + - name: "GetUser" + retries: "3" + timeout: "5s" + rest_query_params: "1:userid,2:username" + rest_headrs: "3:age" + rest_path_params: "4:time,2:name" + params: + "serviceid": + "soa.com.ikurento.user.UserProvider" + "forks": 5 + +shutdown_conf: + timeout: 60s + step_timeout: 10s + +protocol_conf: + # when you choose the Dubbo protocol, the following configuration takes effect + dubbo: + reconnect_interval: 0 + # reconnect_interval is the actual number of connections a session can use + connection_number: 2 + # heartbeat_period is heartbeat interval between server and client connection. + # Effective by client configuration + heartbeat_period: "30s" + # when the session is inactive for more than session_timeout, the session may be closed + session_timeout: "30s" + # a reference has the size of the session connection pool + # that is the maximum number of sessions it may have + pool_size: 4 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty + pool_ttl: 600 + # gr_pool_size is recommended to be set to [cpu core number] * 100 + gr_pool_size: 1200 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 60 + # dubbo-go uses getty as the network connection library. + # The following is the relevant configuration of getty + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + # maximum len of data per request + # this refers to the total amount of data requested or returned + max_msg_len: 102400 + session_name: "client" diff --git a/protocol/rest/rest_config_reader/testdata/provider_config.yml b/protocol/rest/rest_config_reader/testdata/provider_config.yml new file mode 100644 index 0000000000000000000000000000000000000000..76c09cb1b8d91ff50ac37f553ef4486795b057cf --- /dev/null +++ b/protocol/rest/rest_config_reader/testdata/provider_config.yml @@ -0,0 +1,100 @@ +# dubbo server yaml configure file + +filter: "" +# application config +application: + organization : "ikurento.com" + name : "BDTService" + module : "dubbogo user-info server" + version : "0.0.1" + owner : "ZX" + environment : "dev" + +registries : + "hangzhouzk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2181" + username: "" + password: "" + "shanghaizk": + protocol: "zookeeper" + timeout : "3s" + address: "127.0.0.1:2182" + username: "" + password: "" + + +services: + "UserProvider": + registry: "hangzhouzk,shanghaizk" + filter: "" + # the name of limiter + tps.limiter: "default" + # the time unit of interval is ms + tps.limit.interval: 60000 + tps.limit.rate: 200 + # the name of strategy + tps.limit.strategy: "slidingWindow" + # the name of RejectedExecutionHandler + tps.limit.rejected.handler: "default" + # the concurrent request limitation of this service + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" + protocol : "dubbo" + # equivalent to interface of dubbo.xml + interface : "com.ikurento.user.UserProvider" + loadbalance: "random" + version: "1.0" + group: "as" + warmup: "100" + cluster: "failover" + methods: + - name: "GetUser" + retries: 1 + loadbalance: "random" + # the concurrent request limitation of this method + # if the value < 0, it will not be limited. + execute.limit: "200" + # the name of RejectedExecutionHandler + execute.limit.rejected.handler: "default" + +protocols: + "dubbo": + name: "dubbo" + # while using dubbo protocol, ip cannot is 127.0.0.1, because client of java-dubbo will get 'connection refuse' + ip : "127.0.0.1" + port : 20000 + #- name: "jsonrpc" + # ip: "127.0.0.1" + # port: 20001 + +shutdown_conf: + timeout: 60s + step_timeout: 10s + +protocol_conf: + dubbo: + session_number: 700 + session_timeout: "20s" + # gr_pool_size is recommended to be set to [cpu core number] * 10 + gr_pool_size: 120 + # queue_len is recommended to be set to 64 or 128 + queue_len: 64 + # queue_number is recommended to be set to gr_pool_size / 20 + queue_number: 6 + getty_session_param: + compress_encoding: false + tcp_no_delay: true + tcp_keep_alive: true + keep_alive_period: "120s" + tcp_r_buf_size: 262144 + tcp_w_buf_size: 65536 + pkg_wq_size: 512 + tcp_read_timeout: "1s" + tcp_write_timeout: "5s" + wait_timeout: "1s" + max_msg_len: 1024 + session_name: "server" diff --git a/protocol/rest/rest_exporter.go b/protocol/rest/rest_exporter.go new file mode 100644 index 0000000000000000000000000000000000000000..470d525ad806687e7a732ce5681eb372eb431a63 --- /dev/null +++ b/protocol/rest/rest_exporter.go @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "sync" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" +) + +type RestExporter struct { + protocol.BaseExporter +} + +func NewRestExporter(key string, invoker protocol.Invoker, exporterMap *sync.Map) *RestExporter { + return &RestExporter{ + BaseExporter: *protocol.NewBaseExporter(key, invoker, exporterMap), + } +} + +func (re *RestExporter) Unexport() { + serviceId := re.GetInvoker().GetUrl().GetParam(constant.BEAN_NAME_KEY, "") + re.BaseExporter.Unexport() + err := common.ServiceMap.UnRegister(REST, serviceId) + if err != nil { + logger.Errorf("[RestExporter.Unexport] error: %v", err) + } + return +} diff --git a/protocol/rest/rest_interface/rest_client.go b/protocol/rest/rest_interface/rest_client.go new file mode 100644 index 0000000000000000000000000000000000000000..80604d06030446045400f8e2ca0cdfebc74f5977 --- /dev/null +++ b/protocol/rest/rest_interface/rest_client.go @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_interface + +import ( + "time" +) + +type RestOptions struct { + RequestTimeout time.Duration + ConnectTimeout time.Duration +} + +type RestRequest struct { + Location string + Path string + Produces string + Consumes string + Method string + PathParams map[string]string + QueryParams map[string]string + Body interface{} + Headers map[string]string +} + +type RestClient interface { + Do(request *RestRequest, res interface{}) error +} diff --git a/protocol/rest/rest_interface/rest_config.go b/protocol/rest/rest_interface/rest_config.go new file mode 100644 index 0000000000000000000000000000000000000000..301220ec29146f4b6dfb9af7828cb7a8fc7d8e1b --- /dev/null +++ b/protocol/rest/rest_interface/rest_config.go @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_interface + +import "github.com/creasty/defaults" + +type RestConsumerConfig struct { + Client string `default:"resty" yaml:"rest_client" json:"rest_client,omitempty" property:"rest_client"` + Produces string `default:"application/json" yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `default:"application/json" yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + RestConfigMap map[string]*RestConfig `yaml:"references" json:"references,omitempty" property:"references"` +} + +// UnmarshalYAML ... +func (c *RestConsumerConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestConsumerConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +type RestProviderConfig struct { + Server string `default:"go-restful" yaml:"rest_server" json:"rest_server,omitempty" property:"rest_server"` + Produces string `default:"application/json" yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `default:"application/json" yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + RestConfigMap map[string]*RestConfig `yaml:"services" json:"services,omitempty" property:"services"` +} + +// UnmarshalYAML ... +func (c *RestProviderConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestProviderConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +type RestConfig struct { + InterfaceName string `required:"true" yaml:"interface" json:"interface,omitempty" property:"interface"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Path string `yaml:"rest_path" json:"rest_path,omitempty" property:"rest_path"` + Produces string `default:"application/json" yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `default:"application/json" yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + MethodType string `yaml:"rest_method" json:"rest_method,omitempty" property:"rest_method"` + Client string `yaml:"rest_client" json:"rest_client,omitempty" property:"rest_client"` + Server string `yaml:"rest_server" json:"rest_server,omitempty" property:"rest_server"` + RestMethodConfigs []*RestMethodConfig `yaml:"methods" json:"methods,omitempty" property:"methods"` + RestMethodConfigsMap map[string]*RestMethodConfig +} + +// UnmarshalYAML ... +func (c *RestConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} + +type RestMethodConfig struct { + InterfaceName string + MethodName string `required:"true" yaml:"name" json:"name,omitempty" property:"name"` + Url string `yaml:"url" json:"url,omitempty" property:"url"` + Path string `yaml:"rest_path" json:"rest_path,omitempty" property:"rest_path"` + Produces string `yaml:"rest_produces" json:"rest_produces,omitempty" property:"rest_produces"` + Consumes string `yaml:"rest_consumes" json:"rest_consumes,omitempty" property:"rest_consumes"` + MethodType string `yaml:"rest_method" json:"rest_method,omitempty" property:"rest_method"` + PathParams string `yaml:"rest_path_params" json:"rest_path_params,omitempty" property:"rest_path_params"` + PathParamsMap map[int]string + QueryParams string `yaml:"rest_query_params" json:"rest_query_params,omitempty" property:"rest_query_params"` + QueryParamsMap map[int]string + Body int `default:"-1" yaml:"rest_body" json:"rest_body,omitempty" property:"rest_body"` + Headers string `yaml:"rest_headers" json:"rest_headers,omitempty" property:"rest_headers"` + HeadersMap map[int]string +} + +// UnmarshalYAML ... +func (c *RestMethodConfig) UnmarshalYAML(unmarshal func(interface{}) error) error { + if err := defaults.Set(c); err != nil { + return err + } + type plain RestMethodConfig + if err := unmarshal((*plain)(c)); err != nil { + return err + } + return nil +} diff --git a/protocol/rest/rest_interface/rest_config_reader.go b/protocol/rest/rest_interface/rest_config_reader.go new file mode 100644 index 0000000000000000000000000000000000000000..d2222c260784468eac06a576518f07b4b7eb9676 --- /dev/null +++ b/protocol/rest/rest_interface/rest_config_reader.go @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_interface + +type RestConfigReader interface { + ReadConsumerConfig() *RestConsumerConfig + ReadProviderConfig() *RestProviderConfig +} diff --git a/protocol/rest/rest_interface/rest_server.go b/protocol/rest/rest_interface/rest_server.go new file mode 100644 index 0000000000000000000000000000000000000000..f0c9a5f31613823b756af23b7a260af5fa5c9f06 --- /dev/null +++ b/protocol/rest/rest_interface/rest_server.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_interface + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" +) + +type RestServer interface { + Start(url common.URL) + Deploy(invoker protocol.Invoker, restMethodConfig map[string]*RestMethodConfig) + UnDeploy(restMethodConfig map[string]*RestMethodConfig) + Destroy() +} diff --git a/protocol/rest/rest_invoker.go b/protocol/rest/rest_invoker.go new file mode 100644 index 0000000000000000000000000000000000000000..83ce07323de1fe87d8e89528ac87afe4b5816d25 --- /dev/null +++ b/protocol/rest/rest_invoker.go @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "fmt" +) + +import ( + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/protocol" + invocation_impl "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +type RestInvoker struct { + protocol.BaseInvoker + client rest_interface.RestClient + restMethodConfigMap map[string]*rest_interface.RestMethodConfig +} + +func NewRestInvoker(url common.URL, client *rest_interface.RestClient, restMethodConfig map[string]*rest_interface.RestMethodConfig) *RestInvoker { + return &RestInvoker{ + BaseInvoker: *protocol.NewBaseInvoker(url), + client: *client, + restMethodConfigMap: restMethodConfig, + } +} + +func (ri *RestInvoker) Invoke(ctx context.Context, invocation protocol.Invocation) protocol.Result { + inv := invocation.(*invocation_impl.RPCInvocation) + methodConfig := ri.restMethodConfigMap[inv.MethodName()] + var ( + result protocol.RPCResult + body interface{} + pathParams map[string]string + queryParams map[string]string + headers map[string]string + err error + ) + if methodConfig == nil { + result.Err = perrors.Errorf("[RestInvoker] Rest methodConfig:%s is nil", inv.MethodName()) + return &result + } + if pathParams, err = restStringMapTransform(methodConfig.PathParamsMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if queryParams, err = restStringMapTransform(methodConfig.QueryParamsMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if headers, err = restStringMapTransform(methodConfig.HeadersMap, inv.Arguments()); err != nil { + result.Err = err + return &result + } + if len(inv.Arguments()) > methodConfig.Body && methodConfig.Body >= 0 { + body = inv.Arguments()[methodConfig.Body] + } + + req := &rest_interface.RestRequest{ + Location: ri.GetUrl().Location, + Produces: methodConfig.Produces, + Consumes: methodConfig.Consumes, + Method: methodConfig.MethodType, + Path: methodConfig.Path, + PathParams: pathParams, + QueryParams: queryParams, + Body: body, + Headers: headers, + } + result.Err = ri.client.Do(req, inv.Reply()) + if result.Err == nil { + result.Rest = inv.Reply() + } + return &result +} + +func restStringMapTransform(paramsMap map[int]string, args []interface{}) (map[string]string, error) { + resMap := make(map[string]string, len(paramsMap)) + for k, v := range paramsMap { + if k < len(args) && k >= 0 { + resMap[v] = fmt.Sprint(args[k]) + } else { + return nil, perrors.Errorf("[Rest Invoke] Index %v is out of bundle", k) + } + } + return resMap, nil +} diff --git a/protocol/rest/rest_invoker_test.go b/protocol/rest/rest_invoker_test.go new file mode 100644 index 0000000000000000000000000000000000000000..c4090faddf826ce004ab9fe6fe2b55948c75b068 --- /dev/null +++ b/protocol/rest/rest_invoker_test.go @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/rest_client" + _ "github.com/apache/dubbo-go/protocol/rest/rest_config_reader" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +func TestRestInvoker_Invoke(t *testing.T) { + // Refer + proto := GetRestProtocol() + defer proto.Destroy() + url, err := common.NewURL(context.Background(), "rest://127.0.0.1:8877/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + _, err = common.ServiceMap.Register(url.Protocol, &UserProvider{}) + assert.NoError(t, err) + con := config.ProviderConfig{} + config.SetProviderConfig(con) + configMap := make(map[string]*rest_interface.RestConfig) + methodConfigMap := make(map[string]*rest_interface.RestMethodConfig) + queryParamsMap := make(map[int]string) + queryParamsMap[1] = "age" + queryParamsMap[2] = "name" + pathParamsMap := make(map[int]string) + pathParamsMap[0] = "userid" + headersMap := make(map[int]string) + headersMap[3] = "Content-Type" + methodConfigMap["GetUserOne"] = &rest_interface.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUserOne", + Path: "/GetUserOne", + Produces: "application/json", + Consumes: "application/json", + MethodType: "POST", + PathParams: "", + PathParamsMap: nil, + QueryParams: "", + QueryParamsMap: nil, + Body: 0, + } + methodConfigMap["GetUser"] = &rest_interface.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUser", + Path: "/GetUser/{userid}", + Produces: "application/json", + Consumes: "application/json", + MethodType: "GET", + PathParams: "", + PathParamsMap: pathParamsMap, + QueryParams: "", + QueryParamsMap: queryParamsMap, + Body: -1, + HeadersMap: headersMap, + } + + configMap["com.ikurento.user.UserProvider"] = &rest_interface.RestConfig{ + Server: "go-restful", + RestMethodConfigsMap: methodConfigMap, + } + SetRestProviderServiceConfigMap(configMap) + proxyFactory := extension.GetProxyFactory("default") + proto.Export(proxyFactory.GetInvoker(url)) + time.Sleep(5 * time.Second) + configMap = make(map[string]*rest_interface.RestConfig) + configMap["com.ikurento.user.UserProvider"] = &rest_interface.RestConfig{ + RestMethodConfigsMap: methodConfigMap, + } + restClient := rest_client.GetRestyClient(&rest_interface.RestOptions{ConnectTimeout: 3 * time.Second, RequestTimeout: 3 * time.Second}) + invoker := NewRestInvoker(url, &restClient, methodConfigMap) + user := &User{} + inv := invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUser"), + invocation.WithArguments([]interface{}{1, int32(23), "username", "application/json"}), invocation.WithReply(user)) + res := invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.Equal(t, User{Id: 1, Age: int32(23), Name: "username"}, *res.Result().(*User)) + time.Sleep(3 * time.Second) + now := time.Now() + inv = invocation.NewRPCInvocationWithOptions(invocation.WithMethodName("GetUserOne"), + invocation.WithArguments([]interface{}{&User{1, &now, int32(23), "username"}}), invocation.WithReply(user)) + res = invoker.Invoke(context.Background(), inv) + assert.NoError(t, res.Error()) + assert.Equal(t, 1, res.Result().(*User).Id) + assert.Equal(t, now.Unix(), res.Result().(*User).Time.Unix()) + assert.Equal(t, int32(23), res.Result().(*User).Age) + assert.Equal(t, "username", res.Result().(*User).Name) + err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider") + assert.NoError(t, err) +} diff --git a/protocol/rest/rest_protocol.go b/protocol/rest/rest_protocol.go new file mode 100644 index 0000000000000000000000000000000000000000..8002fef4d807b8f437c0635b5482581033bb9943 --- /dev/null +++ b/protocol/rest/rest_protocol.go @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "strings" + "sync" + "time" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" + _ "github.com/apache/dubbo-go/protocol/rest/rest_server" +) + +var ( + restProtocol *RestProtocol +) + +const REST = "rest" + +func init() { + extension.SetProtocol(REST, GetRestProtocol) +} + +type RestProtocol struct { + protocol.BaseProtocol + serverMap map[string]rest_interface.RestServer + clientMap map[rest_interface.RestOptions]rest_interface.RestClient + serverLock sync.Mutex + clientLock sync.Mutex +} + +func NewRestProtocol() *RestProtocol { + return &RestProtocol{ + BaseProtocol: protocol.NewBaseProtocol(), + serverMap: make(map[string]rest_interface.RestServer), + clientMap: make(map[rest_interface.RestOptions]rest_interface.RestClient), + } +} + +func (rp *RestProtocol) Export(invoker protocol.Invoker) protocol.Exporter { + url := invoker.GetUrl() + serviceKey := strings.TrimPrefix(url.Path, "/") + exporter := NewRestExporter(serviceKey, invoker, rp.ExporterMap()) + restConfig := GetRestProviderServiceConfig(url.Service()) + rp.SetExporterMap(serviceKey, exporter) + restServer := rp.getServer(url, restConfig) + restServer.Deploy(invoker, restConfig.RestMethodConfigsMap) + return exporter +} + +func (rp *RestProtocol) Refer(url common.URL) protocol.Invoker { + // create rest_invoker + var requestTimeout = config.GetConsumerConfig().RequestTimeout + requestTimeoutStr := url.GetParam(constant.TIMEOUT_KEY, config.GetConsumerConfig().Request_Timeout) + connectTimeout := config.GetConsumerConfig().ConnectTimeout + if t, err := time.ParseDuration(requestTimeoutStr); err == nil { + requestTimeout = t + } + restConfig := GetRestConsumerServiceConfig(url.Service()) + restOptions := rest_interface.RestOptions{RequestTimeout: requestTimeout, ConnectTimeout: connectTimeout} + restClient := rp.getClient(restOptions, restConfig) + invoker := NewRestInvoker(url, &restClient, restConfig.RestMethodConfigsMap) + rp.SetInvokers(invoker) + return invoker +} + +func (rp *RestProtocol) getServer(url common.URL, restConfig *rest_interface.RestConfig) rest_interface.RestServer { + restServer, ok := rp.serverMap[url.Location] + if !ok { + _, ok := rp.ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + if !ok { + panic("[RestProtocol]" + url.Key() + "is not existing") + } + rp.serverLock.Lock() + restServer, ok = rp.serverMap[url.Location] + if !ok { + restServer = extension.GetNewRestServer(restConfig.Server) + restServer.Start(url) + rp.serverMap[url.Location] = restServer + } + rp.serverLock.Unlock() + + } + return restServer +} + +func (rp *RestProtocol) getClient(restOptions rest_interface.RestOptions, restConfig *rest_interface.RestConfig) rest_interface.RestClient { + restClient, ok := rp.clientMap[restOptions] + rp.clientLock.Lock() + if !ok { + restClient, ok = rp.clientMap[restOptions] + if !ok { + restClient = extension.GetNewRestClient(restConfig.Client, &restOptions) + rp.clientMap[restOptions] = restClient + } + } + rp.clientLock.Unlock() + return restClient +} + +func (rp *RestProtocol) Destroy() { + // destroy rest_server + rp.BaseProtocol.Destroy() + for key, server := range rp.serverMap { + server.Destroy() + delete(rp.serverMap, key) + } + for key := range rp.clientMap { + delete(rp.clientMap, key) + } +} + +func GetRestProtocol() protocol.Protocol { + if restProtocol == nil { + restProtocol = NewRestProtocol() + } + return restProtocol +} diff --git a/protocol/rest/rest_protocol_test.go b/protocol/rest/rest_protocol_test.go new file mode 100644 index 0000000000000000000000000000000000000000..cee934b72984abeb787701427ae7216e54608c81 --- /dev/null +++ b/protocol/rest/rest_protocol_test.go @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest + +import ( + "context" + "fmt" + "strings" + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/extension" + _ "github.com/apache/dubbo-go/common/proxy/proxy_factory" + "github.com/apache/dubbo-go/config" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +func TestRestProtocol_Refer(t *testing.T) { + // Refer + proto := GetRestProtocol() + url, err := common.NewURL(context.Background(), "rest://127.0.0.1:20000/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + con := config.ConsumerConfig{ + ConnectTimeout: 5 * time.Second, + RequestTimeout: 5 * time.Second, + } + config.SetConsumerConfig(con) + configMap := make(map[string]*rest_interface.RestConfig) + configMap["com.ikurento.user.UserProvider"] = &rest_interface.RestConfig{ + Client: "resty", + } + SetRestConsumerServiceConfigMap(configMap) + invoker := proto.Refer(url) + + // make sure url + eq := invoker.GetUrl().URLEqual(url) + assert.True(t, eq) + + // make sure invokers after 'Destroy' + invokersLen := len(proto.(*RestProtocol).Invokers()) + assert.Equal(t, 1, invokersLen) + proto.Destroy() + invokersLen = len(proto.(*RestProtocol).Invokers()) + assert.Equal(t, 0, invokersLen) +} + +func TestJsonrpcProtocol_Export(t *testing.T) { + // Export + proto := GetRestProtocol() + url, err := common.NewURL(context.Background(), "rest://127.0.0.1:8888/com.ikurento.user.UserProvider?anyhost=true&"+ + "application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&"+ + "environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&"+ + "module=dubbogo+user-info+server&org=ikurento.com&owner=ZX&pid=1447&revision=0.0.1&"+ + "side=provider&timeout=3000×tamp=1556509797245") + assert.NoError(t, err) + _, err = common.ServiceMap.Register(url.Protocol, &UserProvider{}) + assert.NoError(t, err) + con := config.ProviderConfig{} + config.SetProviderConfig(con) + configMap := make(map[string]*rest_interface.RestConfig) + methodConfigMap := make(map[string]*rest_interface.RestMethodConfig) + queryParamsMap := make(map[int]string) + queryParamsMap[1] = "age" + queryParamsMap[2] = "name" + pathParamsMap := make(map[int]string) + pathParamsMap[0] = "userid" + methodConfigMap["GetUser"] = &rest_interface.RestMethodConfig{ + InterfaceName: "", + MethodName: "GetUser", + Path: "/GetUser/{userid}", + Produces: "application/json", + Consumes: "application/json", + MethodType: "GET", + PathParams: "", + PathParamsMap: pathParamsMap, + QueryParams: "", + QueryParamsMap: queryParamsMap, + Body: -1, + } + configMap["com.ikurento.user.UserProvider"] = &rest_interface.RestConfig{ + Server: "go-restful", + RestMethodConfigsMap: methodConfigMap, + } + SetRestProviderServiceConfigMap(configMap) + proxyFactory := extension.GetProxyFactory("default") + exporter := proto.Export(proxyFactory.GetInvoker(url)) + // make sure url + eq := exporter.GetInvoker().GetUrl().URLEqual(url) + assert.True(t, eq) + // make sure exporterMap after 'Unexport' + fmt.Println(url.Path) + _, ok := proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + assert.True(t, ok) + exporter.Unexport() + _, ok = proto.(*RestProtocol).ExporterMap().Load(strings.TrimPrefix(url.Path, "/")) + assert.False(t, ok) + + // make sure serverMap after 'Destroy' + _, ok = proto.(*RestProtocol).serverMap[url.Location] + assert.True(t, ok) + proto.Destroy() + _, ok = proto.(*RestProtocol).serverMap[url.Location] + assert.False(t, ok) + err = common.ServiceMap.UnRegister(url.Protocol, "com.ikurento.user.UserProvider") + assert.NoError(t, err) +} + +type UserProvider struct { +} + +func (p *UserProvider) Reference() string { + return "com.ikurento.user.UserProvider" +} + +func (p *UserProvider) GetUser(ctx context.Context, id int, age int32, name string, contentType string) (*User, error) { + return &User{ + Id: id, + Time: nil, + Age: age, + Name: name, + }, nil +} + +func (p *UserProvider) GetUserOne(ctx context.Context, user *User) (*User, error) { + return user, nil +} + +type User struct { + Id int + Time *time.Time + Age int32 + Name string +} diff --git a/protocol/rest/rest_server/go_restful_server.go b/protocol/rest/rest_server/go_restful_server.go new file mode 100644 index 0000000000000000000000000000000000000000..d388aeeb3d69722174e3f6957143d2b7d3147ee8 --- /dev/null +++ b/protocol/rest/rest_server/go_restful_server.go @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rest_server + +import ( + "context" + "fmt" + "net" + "net/http" + "reflect" + "strconv" + "strings" + "time" +) + +import ( + "github.com/emicklei/go-restful/v3" + perrors "github.com/pkg/errors" +) + +import ( + "github.com/apache/dubbo-go/common" + "github.com/apache/dubbo-go/common/constant" + "github.com/apache/dubbo-go/common/extension" + "github.com/apache/dubbo-go/common/logger" + "github.com/apache/dubbo-go/protocol" + "github.com/apache/dubbo-go/protocol/invocation" + "github.com/apache/dubbo-go/protocol/rest/rest_interface" +) + +func init() { + extension.SetRestServer(constant.DEFAULT_REST_SERVER, GetNewGoRestfulServer) +} + +type GoRestfulServer struct { + srv *http.Server + container *restful.Container +} + +func NewGoRestfulServer() *GoRestfulServer { + return &GoRestfulServer{} +} + +func (grs *GoRestfulServer) Start(url common.URL) { + grs.container = restful.NewContainer() + grs.srv = &http.Server{ + Handler: grs.container, + } + ln, err := net.Listen("tcp", url.Location) + if err != nil { + panic(perrors.New(fmt.Sprintf("Restful Server start error:%v", err))) + } + + go func() { + err := grs.srv.Serve(ln) + if err != nil && err != http.ErrServerClosed { + logger.Errorf("[Go Restful] http.server.Serve(addr{%s}) = err{%+v}", url.Location, err) + } + }() +} + +func (grs *GoRestfulServer) Deploy(invoker protocol.Invoker, restMethodConfig map[string]*rest_interface.RestMethodConfig) { + svc := common.ServiceMap.GetService(invoker.GetUrl().Protocol, strings.TrimPrefix(invoker.GetUrl().Path, "/")) + for methodName, config := range restMethodConfig { + // get method + method := svc.Method()[methodName] + types := method.ArgsType() + ws := new(restful.WebService) + ws.Path(config.Path). + Produces(strings.Split(config.Produces, ",")...). + Consumes(strings.Split(config.Consumes, ",")...). + Route(ws.Method(config.MethodType).To(getFunc(methodName, invoker, types, config))) + grs.container.Add(ws) + } + +} + +func getFunc(methodName string, invoker protocol.Invoker, types []reflect.Type, config *rest_interface.RestMethodConfig) func(req *restful.Request, resp *restful.Response) { + return func(req *restful.Request, resp *restful.Response) { + var ( + err error + args []interface{} + ) + args = getArgsFromRequest(req, types, config) + result := invoker.Invoke(context.Background(), invocation.NewRPCInvocation(methodName, args, make(map[string]string, 0))) + if result.Error() != nil { + err = resp.WriteError(http.StatusInternalServerError, result.Error()) + if err != nil { + logger.Errorf("[Go Restful] WriteError error:%v", err) + } + return + } + err = resp.WriteEntity(result.Result()) + if err != nil { + logger.Error("[Go Restful] WriteEntity error:%v", err) + } + } +} +func (grs *GoRestfulServer) UnDeploy(restMethodConfig map[string]*rest_interface.RestMethodConfig) { + for _, config := range restMethodConfig { + ws := new(restful.WebService) + ws.Path(config.Path) + err := grs.container.Remove(ws) + if err != nil { + logger.Warnf("[Go restful] Remove web service error:%v", err) + } + } +} + +func (grs *GoRestfulServer) Destroy() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := grs.srv.Shutdown(ctx); err != nil { + logger.Errorf("[Go Restful] Server Shutdown:", err) + } + logger.Infof("[Go Restful] Server exiting") +} + +func getArgsFromRequest(req *restful.Request, types []reflect.Type, config *rest_interface.RestMethodConfig) []interface{} { + args := make([]interface{}, len(types)) + for i, t := range types { + args[i] = reflect.Zero(t).Interface() + } + var ( + err error + param interface{} + i64 int64 + ) + for k, v := range config.PathParamsMap { + if k < 0 || k >= len(types) { + logger.Errorf("[Go restful] Path param parse error, the args:%v doesn't exist", k) + continue + } + t := types[k] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + if kind == reflect.Int { + param, err = strconv.Atoi(req.PathParameter(v)) + } else if kind == reflect.Int32 { + i64, err = strconv.ParseInt(req.PathParameter(v), 10, 32) + if err == nil { + param = int32(i64) + } + } else if kind == reflect.Int64 { + param, err = strconv.ParseInt(req.PathParameter(v), 10, 64) + } else if kind != reflect.String { + logger.Warnf("[Go restful] Path param parse error, the args:%v of type isn't int or string", k) + continue + } + if err != nil { + logger.Errorf("[Go restful] Path param parse error, error is %v", err) + continue + } + args[k] = param + } + for k, v := range config.QueryParamsMap { + if k < 0 || k > len(types) { + logger.Errorf("[Go restful] Query param parse error, the args:%v doesn't exist", k) + continue + } + t := types[k] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + if kind == reflect.Slice { + param = req.QueryParameters(v) + } else if kind == reflect.String { + param = req.QueryParameter(v) + } else if kind == reflect.Int { + param, err = strconv.Atoi(req.QueryParameter(v)) + } else if kind == reflect.Int32 { + i64, err = strconv.ParseInt(req.QueryParameter(v), 10, 32) + if err == nil { + param = int32(i64) + } + } else if kind == reflect.Int64 { + param, err = strconv.ParseInt(req.QueryParameter(v), 10, 64) + } else { + logger.Errorf("[Go restful] Query param parse error, the args:%v of type isn't int or string or slice", k) + continue + } + if err != nil { + logger.Errorf("[Go restful] Query param parse error, error is %v", err) + continue + } + args[k] = param + } + + if config.Body >= 0 && config.Body < len(types) { + t := types[config.Body] + kind := t.Kind() + if kind == reflect.Ptr { + t = t.Elem() + } + n := reflect.New(t) + if n.CanInterface() { + ni := n.Interface() + if err := req.ReadEntity(ni); err != nil { + logger.Errorf("[Go restful] Read body entity error:%v", err) + } else { + args[config.Body] = ni + } + } + + } + for k, v := range config.HeadersMap { + param := req.HeaderParameter(v) + if k < 0 || k >= len(types) { + logger.Errorf("[Go restful] Header param parse error, the args:%v doesn't exist", k) + continue + } + t := types[k] + if t.Kind() == reflect.Ptr { + t = t.Elem() + } + if t.Kind() == reflect.String { + args[k] = param + } else { + logger.Errorf("[Go restful] Header param parse error, the args:%v of type isn't string", k) + } + } + + return args +} + +func GetNewGoRestfulServer() rest_interface.RestServer { + return NewGoRestfulServer() +} diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 0c70caa7fa9f8c6283616d66ffd459349833df39..594d87b14ca932c3a2e8c1e271757c9d94d93bb8 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -44,6 +44,7 @@ const ( var ( errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil") errNilChildren = perrors.Errorf("has none children") + errNilNode = perrors.Errorf("node does not exist") ) // ZookeeperClient ... @@ -260,11 +261,13 @@ LOOP: logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name) z.stop() z.Lock() - if z.Conn != nil { - z.Conn.Close() - z.Conn = nil - } + conn := z.Conn + z.Conn = nil z.Unlock() + if conn != nil { + conn.Close() + } + break LOOP case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path) @@ -379,11 +382,13 @@ func (z *ZookeeperClient) Close() { z.stop() z.Wait.Wait() z.Lock() - if z.Conn != nil { - z.Conn.Close() - z.Conn = nil - } + conn := z.Conn + z.Conn = nil z.Unlock() + if conn != nil { + conn.Close() + } + logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs) } @@ -399,10 +404,12 @@ func (z *ZookeeperClient) Create(basePath string) error { tmpPath = path.Join(tmpPath, "/", str) err = errNilZkClientConn z.Lock() - if z.Conn != nil { - _, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) - } + conn := z.Conn z.Unlock() + if conn != nil { + _, err = conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll)) + } + if err != nil { if err == zk.ErrNodeExists { logger.Infof("zk.create(\"%s\") exists\n", tmpPath) @@ -424,10 +431,11 @@ func (z *ZookeeperClient) Delete(basePath string) error { err = errNilZkClientConn z.Lock() - if z.Conn != nil { - err = z.Conn.Delete(basePath, -1) - } + conn := z.Conn z.Unlock() + if conn != nil { + err = conn.Delete(basePath, -1) + } return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath) } @@ -445,10 +453,12 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er data = []byte("") zkPath = path.Join(basePath) + "/" + node z.Lock() - if z.Conn != nil { - tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) - } + conn := z.Conn z.Unlock() + if conn != nil { + tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + } + //if err != nil && err != zk.ErrNodeExists { if err != nil { logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err)) @@ -468,15 +478,17 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, err = errNilZkClientConn z.Lock() - if z.Conn != nil { - tmpPath, err = z.Conn.Create( + conn := z.Conn + z.Unlock() + if conn != nil { + tmpPath, err = conn.Create( path.Join(basePath)+"/", data, zk.FlagEphemeral|zk.FlagSequence, zk.WorldACL(zk.PermAll), ) } - z.Unlock() + logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath) if err != nil && err != zk.ErrNodeExists { logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n", @@ -499,19 +511,24 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, err = errNilZkClientConn z.Lock() - if z.Conn != nil { - children, stat, event, err = z.Conn.ChildrenW(path) - } + conn := z.Conn z.Unlock() + if conn != nil { + children, stat, event, err = conn.ChildrenW(path) + } + if err != nil { + if err == zk.ErrNoChildrenForEphemerals { + return nil, nil, errNilChildren + } if err == zk.ErrNoNode { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, errNilNode } logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err) return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path) } if stat == nil { - return nil, nil, perrors.Errorf("path{%s} has none children", path) + return nil, nil, perrors.Errorf("path{%s} get stat is nil", path) } if len(children) == 0 { return nil, nil, errNilChildren @@ -530,10 +547,12 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) { err = errNilZkClientConn z.Lock() - if z.Conn != nil { - children, stat, err = z.Conn.Children(path) - } + conn := z.Conn z.Unlock() + if conn != nil { + children, stat, err = conn.Children(path) + } + if err != nil { if err == zk.ErrNoNode { return nil, perrors.Errorf("path{%s} has none children", path) @@ -561,10 +580,12 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) { err = errNilZkClientConn z.Lock() - if z.Conn != nil { - exist, _, event, err = z.Conn.ExistsW(zkPath) - } + conn := z.Conn z.Unlock() + if conn != nil { + exist, _, event, err = conn.ExistsW(zkPath) + } + if err != nil { logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err)) return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath) diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 6850aa3ffac28fbf52c2924649ef8ed1b149f88b..43ee54f81f71ff74064aa5756ea11c70ba2055fa 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -190,7 +190,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi if MaxFailTimes <= failTimes { failTimes = MaxFailTimes } - logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err) + logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err) // clear the event channel CLEAR: for { @@ -201,6 +201,11 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi } } l.client.RegisterEvent(zkPath, &event) + if err == errNilNode { + logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath) + l.client.UnregisterEvent(zkPath, &event) + return + } select { case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): l.client.UnregisterEvent(zkPath, &event)