Skip to content
Snippets Groups Projects
Commit 617ace27 authored by vito.he's avatar vito.he
Browse files

Add: unit_test for remoting zk & Mod: content-protocol back to content-type

parent 152b6265
No related branches found
No related tags found
No related merge requests found
...@@ -97,7 +97,7 @@ func (s *Server) handlePkg(conn net.Conn) { ...@@ -97,7 +97,7 @@ func (s *Server) handlePkg(conn net.Conn) {
ContentLength: int64(len(body)), ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)), Body: ioutil.NopCloser(bytes.NewReader(body)),
} }
rsp.Header.Del("Content-Protocol") rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length") rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout") rsp.Header.Del("Timeout")
...@@ -136,10 +136,10 @@ func (s *Server) handlePkg(conn net.Conn) { ...@@ -136,10 +136,10 @@ func (s *Server) handlePkg(conn net.Conn) {
reqHeader["HttpMethod"] = r.Method reqHeader["HttpMethod"] = r.Method
httpTimeout := s.timeout httpTimeout := s.timeout
contentType := reqHeader["Content-Protocol"] contentType := reqHeader["Content-Type"]
if contentType != "application/json" && contentType != "application/json-rpc" { if contentType != "application/json" && contentType != "application/json-rpc" {
setTimeout(conn, httpTimeout) setTimeout(conn, httpTimeout)
r.Header.Set("Content-Protocol", "text/plain") r.Header.Set("Content-Type", "text/plain")
if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil { if errRsp := sendErrorResp(r.Header, []byte(perrors.WithStack(err).Error())); errRsp != nil {
logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s", logger.Warnf("sendErrorResp(header:%#v, error:%v) = error:%s",
r.Header, perrors.WithStack(err), errRsp) r.Header, perrors.WithStack(err), errRsp)
...@@ -254,7 +254,7 @@ func serveRequest(ctx context.Context, ...@@ -254,7 +254,7 @@ func serveRequest(ctx context.Context,
ContentLength: int64(len(body)), ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)), Body: ioutil.NopCloser(bytes.NewReader(body)),
} }
rsp.Header.Del("Content-Protocol") rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length") rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout") rsp.Header.Del("Timeout")
for k, v := range header { for k, v := range header {
...@@ -278,7 +278,7 @@ func serveRequest(ctx context.Context, ...@@ -278,7 +278,7 @@ func serveRequest(ctx context.Context,
ContentLength: int64(len(body)), ContentLength: int64(len(body)),
Body: ioutil.NopCloser(bytes.NewReader(body)), Body: ioutil.NopCloser(bytes.NewReader(body)),
} }
rsp.Header.Del("Content-Protocol") rsp.Header.Del("Content-Type")
rsp.Header.Del("Content-Length") rsp.Header.Del("Content-Length")
rsp.Header.Del("Timeout") rsp.Header.Del("Timeout")
for k, v := range header { for k, v := range header {
...@@ -424,7 +424,7 @@ func serveRequest(ctx context.Context, ...@@ -424,7 +424,7 @@ func serveRequest(ctx context.Context,
ContentLength: int64(len(rspStream)), ContentLength: int64(len(rspStream)),
Body: ioutil.NopCloser(bytes.NewReader(rspStream)), Body: ioutil.NopCloser(bytes.NewReader(rspStream)),
} }
delete(header, "Content-Protocol") delete(header, "Content-Type")
delete(header, "Content-Length") delete(header, "Content-Length")
delete(header, "Timeout") delete(header, "Timeout")
for k, v := range header { for k, v := range header {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"context"
"sync"
"testing"
"time"
)
import (
"github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/common"
)
type mockFacade struct {
client *ZookeeperClient
cltLock sync.Mutex
wg sync.WaitGroup
URL *common.URL
done chan struct{}
}
func (r *mockFacade) ZkClient() *ZookeeperClient {
return r.client
}
func (r *mockFacade) SetZkClient(client *ZookeeperClient) {
r.client = client
}
func (r *mockFacade) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
func (r *mockFacade) WaitGroup() *sync.WaitGroup {
return &r.wg
}
func (r *mockFacade) GetDone() chan struct{} {
return r.done
}
func (r *mockFacade) GetUrl() common.URL {
return *r.URL
}
func (r *mockFacade) Destroy() {
close(r.done)
r.wg.Wait()
}
func (r *mockFacade) RestartCallBack() bool {
return true
}
func (r *mockFacade) IsAvailable() bool {
return true
}
func Test_Fascade(t *testing.T) {
ts, z, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
defer ts.Stop()
url, _ := common.NewURL(context.Background(), "mock://127.0.0.1")
mock := &mockFacade{client: z, URL: &url}
go HandleClientRestart(mock)
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
z.Close()
verifyEventStateOrder(t, event, []zk.State{zk.StateDisconnected}, "event channel")
//time.Sleep(2e9)
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package zookeeper
import (
"fmt"
"testing"
"time"
)
import (
"github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
"github.com/apache/dubbo-go/remoting"
)
func initZkData(t *testing.T) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event) {
ts, client, event, err := NewMockZookeeperClient("test", 15*time.Second)
assert.NoError(t, err)
data := `
dubbo.consumer.request_timeout=5s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
err = client.Create("/dubbo/dubbo.properties")
assert.NoError(t, err)
_, err = client.Conn.Set("/dubbo/dubbo.properties", []byte(data), 0)
assert.NoError(t, err)
return ts, client, event
}
func TestListener(t *testing.T) {
changedData := `
dubbo.consumer.request_timeout=3s
dubbo.consumer.connect_timeout=5s
dubbo.application.organization=ikurento.com
dubbo.application.name=BDTService
dubbo.application.module=dubbogo user-info server
dubbo.application.version=0.0.1
dubbo.application.owner=ZX
dubbo.application.environment=dev
dubbo.registries.hangzhouzk.protocol=zookeeper
dubbo.registries.hangzhouzk.timeout=3s
dubbo.registries.hangzhouzk.address=127.0.0.1:2181
dubbo.registries.shanghaizk.protocol=zookeeper
dubbo.registries.shanghaizk.timeout=3s
dubbo.registries.shanghaizk.address=127.0.0.1:2182
dubbo.service.com.ikurento.user.UserProvider.protocol=dubbo
dubbo.service.com.ikurento.user.UserProvider.interface=com.ikurento.user.UserProvider
dubbo.service.com.ikurento.user.UserProvider.loadbalance=random
dubbo.service.com.ikurento.user.UserProvider.warmup=100
dubbo.service.com.ikurento.user.UserProvider.cluster=failover
`
ts, client, event := initZkData(t)
defer ts.Stop()
client.Wait.Add(1)
go client.HandleZkEvent(event)
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData}
listener.ListenServiceEvent("/dubbo", dataListener)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
client.Wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
}
type mockDataListener struct {
eventList []remoting.Event
client *ZookeeperClient
changedData string
}
func (m *mockDataListener) DataChange(eventType remoting.Event) bool {
fmt.Println(eventType)
m.eventList = append(m.eventList, eventType)
if eventType.Content == m.changedData {
m.client.Wait.Done()
}
return true
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment