diff --git a/protocol/jsonrpc/server.go b/protocol/jsonrpc/server.go index ec247583693c890b21414ee4e3a7a04e14b20167..5b5548067225bcf6d8bcbaf35cee63c829c03edc 100644 --- a/protocol/jsonrpc/server.go +++ b/protocol/jsonrpc/server.go @@ -97,7 +97,7 @@ func (s *Server) handlePkg(conn net.Conn) { ContentLength: int64(len(body)), Body: ioutil.NopCloser(bytes.NewReader(body)), } - rsp.Header.Del("Content-Protocol") + rsp.Header.Del("Content-Type") rsp.Header.Del("Content-Length") rsp.Header.Del("Timeout") @@ -136,10 +136,10 @@ func (s *Server) handlePkg(conn net.Conn) { reqHeader["HttpMethod"] = r.Method httpTimeout := s.timeout - contentType := reqHeader["Content-Protocol"] + contentType := reqHeader["Content-Type"] if contentType != "application/json" && contentType != "application/json-rpc" { setTimeout(conn, httpTimeout) - r.Header.Set("Content-Protocol", "text/plain") + r.Header.Set("Content-Type", "text/plain") if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil { logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s", r.Header, perrors.WithStack(err), errRsp) @@ -254,7 +254,7 @@ func serveRequest(ctx context.Context, ContentLength: int64(len(body)), Body: ioutil.NopCloser(bytes.NewReader(body)), } - rsp.Header.Del("Content-Protocol") + rsp.Header.Del("Content-Type") rsp.Header.Del("Content-Length") rsp.Header.Del("Timeout") for k, v := range header { @@ -278,7 +278,7 @@ func serveRequest(ctx context.Context, ContentLength: int64(len(body)), Body: ioutil.NopCloser(bytes.NewReader(body)), } - rsp.Header.Del("Content-Protocol") + rsp.Header.Del("Content-Type") rsp.Header.Del("Content-Length") rsp.Header.Del("Timeout") for k, v := range header { @@ -424,7 +424,7 @@ func serveRequest(ctx context.Context, ContentLength: int64(len(rspStream)), Body: ioutil.NopCloser(bytes.NewReader(rspStream)), } - delete(header, "Content-Protocol") + delete(header, "Content-Type") delete(header, "Content-Length") delete(header, "Timeout") for k, v := range header { diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go new file mode 100644 index 0000000000000000000000000000000000000000..46edd49bf7cd22011bd7dfc8b89227257c5ad2ab --- /dev/null +++ b/remoting/zookeeper/facade_test.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "context" + "sync" + "testing" + "time" +) +import ( + "github.com/samuel/go-zookeeper/zk" + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/common" +) + +type mockFacade struct { + client *ZookeeperClient + cltLock sync.Mutex + wg sync.WaitGroup + URL *common.URL + done chan struct{} +} + +func (r *mockFacade) ZkClient() *ZookeeperClient { + return r.client +} + +func (r *mockFacade) SetZkClient(client *ZookeeperClient) { + r.client = client +} + +func (r *mockFacade) ZkClientLock() *sync.Mutex { + return &r.cltLock +} + +func (r *mockFacade) WaitGroup() *sync.WaitGroup { + return &r.wg +} + +func (r *mockFacade) GetDone() chan struct{} { + return r.done +} + +func (r *mockFacade) GetUrl() common.URL { + return *r.URL +} + +func (r *mockFacade) Destroy() { + close(r.done) + r.wg.Wait() +} + +func (r *mockFacade) RestartCallBack() bool { + return true +} +func (r *mockFacade) IsAvailable() bool { + return true +} + +func Test_Fascade(t *testing.T) { + ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + defer ts.Stop() + url, _ := common.NewURL(context.Background(), "mock://127.0.0.1") + mock := &mockFacade{client: z, URL: &url} + go HandleClientRestart(mock) + states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession} + verifyEventStateOrder(t, event, states, "event channel") + z.Close() + verifyEventStateOrder(t, event, []zk.State{zk.StateDisconnected}, "event channel") + //time.Sleep(2e9) +} diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b1f0d43d3429a38d8d5e9b5ad804dfecd2ee53c1 --- /dev/null +++ b/remoting/zookeeper/listener_test.go @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package zookeeper + +import ( + "fmt" + "testing" + "time" +) +import ( + "github.com/samuel/go-zookeeper/zk" + "github.com/stretchr/testify/assert" +) +import ( + "github.com/apache/dubbo-go/remoting" +) + +func initZkData(t *testing.T) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event) { + ts, client, event, err := NewMockZookeeperClient("test", 15*time.Second) + assert.NoError(t, err) + + data := ` + dubbo.consumer.request_timeout=5s + dubbo.consumer.connect_timeout=5s + dubbo.application.organization=ikurento.com + dubbo.application.name=BDTService + dubbo.application.module=dubbogo user-info server + dubbo.application.version=0.0.1 + dubbo.application.owner=ZX + dubbo.application.environment=dev + dubbo.registries.hangzhouzk.protocol=zookeeper + dubbo.registries.hangzhouzk.timeout=3s + dubbo.registries.hangzhouzk.address=127.0.0.1:2181 + dubbo.registries.shanghaizk.protocol=zookeeper + dubbo.registries.shanghaizk.timeout=3s + dubbo.registries.shanghaizk.address=127.0.0.1:2182 + dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo + dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider + dubbo.service.com.ikurento.user.UserProvider.loadbalance=random + dubbo.service.com.ikurento.user.UserProvider.warmup=100 + dubbo.service.com.ikurento.user.UserProvider.cluster=failover +` + + err = client.Create("/dubbo/dubbo.properties") + assert.NoError(t, err) + + _, err = client.Conn.Set("/dubbo/dubbo.properties", []byte(data), 0) + assert.NoError(t, err) + + return ts, client, event +} +func TestListener(t *testing.T) { + changedData := ` + dubbo.consumer.request_timeout=3s + dubbo.consumer.connect_timeout=5s + dubbo.application.organization=ikurento.com + dubbo.application.name=BDTService + dubbo.application.module=dubbogo user-info server + dubbo.application.version=0.0.1 + dubbo.application.owner=ZX + dubbo.application.environment=dev + dubbo.registries.hangzhouzk.protocol=zookeeper + dubbo.registries.hangzhouzk.timeout=3s + dubbo.registries.hangzhouzk.address=127.0.0.1:2181 + dubbo.registries.shanghaizk.protocol=zookeeper + dubbo.registries.shanghaizk.timeout=3s + dubbo.registries.shanghaizk.address=127.0.0.1:2182 + dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo + dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider + dubbo.service.com.ikurento.user.UserProvider.loadbalance=random + dubbo.service.com.ikurento.user.UserProvider.warmup=100 + dubbo.service.com.ikurento.user.UserProvider.cluster=failover +` + + ts, client, event := initZkData(t) + defer ts.Stop() + client.Wait.Add(1) + go client.HandleZkEvent(event) + listener := NewZkEventListener(client) + dataListener := &mockDataListener{client: client, changedData: changedData} + listener.ListenServiceEvent("/dubbo", dataListener) + + _, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1) + assert.NoError(t, err) + client.Wait.Wait() + assert.Equal(t, changedData, dataListener.eventList[1].Content) +} + +type mockDataListener struct { + eventList []remoting.Event + client *ZookeeperClient + changedData string +} + +func (m *mockDataListener) DataChange(eventType remoting.Event) bool { + fmt.Println(eventType) + m.eventList = append(m.eventList, eventType) + if eventType.Content == m.changedData { + m.client.Wait.Done() + } + return true +}