Skip to content
Snippets Groups Projects
Unverified Commit 7f5fc9f3 authored by nnsgmsone's avatar nnsgmsone Committed by GitHub
Browse files

Add batch's reference count and fix errcheck cases (#2541)

* Add batch's reference count and fix errcheck cases

* replace panic with require.NoError
parent 95c654e0
No related branches found
No related tags found
No related merge requests found
Showing
with 220 additions and 98 deletions
......@@ -17,6 +17,7 @@ package batch
import (
"bytes"
"fmt"
"sync/atomic"
"github.com/matrixorigin/matrixone/pkg/container/vector"
"github.com/matrixorigin/matrixone/pkg/encoding"
......@@ -89,6 +90,9 @@ func GetVector(bat *Batch, pos int32) *vector.Vector {
}
func Clean(bat *Batch, m *mheap.Mheap) {
if atomic.AddInt64(&bat.Cnt, -1) != 0 {
return
}
for _, vec := range bat.Vecs {
if vec != nil {
vector.Clean(vec, m)
......
......@@ -40,13 +40,14 @@ func TestBatch(t *testing.T) {
SetLength(bat0, 10)
sels := []int64{1, 2, 3}
Shrink(bat0, sels)
Shuffle(bat1, sels, mp)
err := Shuffle(bat1, sels, mp)
require.NoError(t, err)
{
vecs := make([]*vector.Vector, 1)
Prefetch(bat0, []int32{0}, vecs)
}
fmt.Printf("%v\n", bat0.String())
_, err := bat0.Append(mp, bat1)
_, err = bat0.Append(mp, bat1)
require.NoError(t, err)
bat0.InitZsOne(Length(bat0))
Clean(bat0, mp)
......
......@@ -22,6 +22,7 @@ import (
// Batch represents a part of a relationship
type Batch struct {
Ht any // anything
Cnt int64 // reference count, default is 1
Zs []int64 // ring
Rs []ring.Ring // aggregation list
Vecs []*vector.Vector // columns
......
......@@ -159,7 +159,9 @@ func (r *ApproxCountDistinctRing) BulkFill(i int64, zs []int64, vec *vector.Vect
func (r *ApproxCountDistinctRing) Add(a interface{}, x, y int64) {
ar := a.(*ApproxCountDistinctRing)
r.Sk[x].Merge(ar.Sk[y])
if err := r.Sk[x].Merge(ar.Sk[y]); err != nil {
panic(err)
}
}
func (r *ApproxCountDistinctRing) BatchAdd(a interface{}, start int64, os []uint8, vps []uint64) {
......@@ -167,14 +169,17 @@ func (r *ApproxCountDistinctRing) BatchAdd(a interface{}, start int64, os []uint
for i := range os {
dest := vps[i] - 1
src := int64(i) + start
r.Sk[dest].Merge(ar.Sk[src])
if err := r.Sk[dest].Merge(ar.Sk[src]); err != nil {
panic(err)
}
}
}
func (r *ApproxCountDistinctRing) Mul(a interface{}, x, y, z int64) {
ar := a.(*ApproxCountDistinctRing)
r.Sk[x].Merge(ar.Sk[y])
if err := r.Sk[x].Merge(ar.Sk[y]); err != nil {
panic(err)
}
}
func (r *ApproxCountDistinctRing) Eval(_ []int64) *vector.Vector {
......
......@@ -36,11 +36,15 @@ type ApproxCountDistinctRing struct {
func (r *ApproxCountDistinctRing) Marshal(w io.Writer) error {
// length
n := len(r.Sk)
w.Write(encoding.EncodeUint32(uint32(n)))
if _, err := w.Write(encoding.EncodeUint32(uint32(n))); err != nil {
return err
}
// data & values
if n > 0 {
// in some tests Data is nil, encode Vs anyway
w.Write(encoding.EncodeUint64Slice(r.Vs))
if _, err := w.Write(encoding.EncodeUint64Slice(r.Vs)); err != nil {
return err
}
}
// sketches
for _, sk := range r.Sk {
......@@ -48,11 +52,17 @@ func (r *ApproxCountDistinctRing) Marshal(w io.Writer) error {
if err != nil {
return err
}
w.Write(encoding.EncodeUint32(uint32(len(sk_buf))))
w.Write(sk_buf)
if _, err := w.Write(encoding.EncodeUint32(uint32(len(sk_buf)))); err != nil {
return err
}
if _, err := w.Write(sk_buf); err != nil {
return err
}
}
// type
w.Write(encoding.EncodeType(r.Typ))
if _, err := w.Write(encoding.EncodeType(r.Typ)); err != nil {
return err
}
return nil
}
......@@ -90,7 +100,9 @@ func (r *ApproxCountDistinctRing) unmarshal(data []byte, proc *process.Process)
if proc == nil {
r.decodeData(data[:n*8])
} else {
r.decodeDataWithProc(data[:n*8], proc)
if err := r.decodeDataWithProc(data[:n*8], proc); err != nil {
return nil, err
}
}
data = data[n*8:]
}
......
......@@ -15,8 +15,9 @@
package types
import (
"github.com/stretchr/testify/require"
"testing"
"github.com/stretchr/testify/require"
)
func TestBytes_Reset(t *testing.T) {
......@@ -33,7 +34,8 @@ func TestBytes_Window(t *testing.T) {
func TestBytes_Append(t *testing.T) {
myBytes := Bytes{Data: []byte("nihaohellogutentagkonichiwa"), Offsets: []uint32{0, 5, 10, 18}, Lengths: []uint32{5, 5, 8, 9}}
appendBytes := [][]byte{[]byte("festina"), []byte("lente")}
myBytes.Append(appendBytes)
err := myBytes.Append(appendBytes)
require.NoError(t, err)
require.Equal(t, Bytes{Data: []byte("nihaohellogutentagkonichiwafestinalente"),
Offsets: []uint32{0, 5, 10, 18, 27, 34}, Lengths: []uint32{5, 5, 8, 9, 7, 5}}, myBytes)
}
......
This diff is collapsed.
......@@ -36,7 +36,9 @@ func New(addr string, maxsize int, log *zap.Logger) (Server, error) {
}
func (s *server) Stop() {
s.app.Stop()
if err := s.app.Stop(); err != nil {
panic(err)
}
}
func (s *server) Run() error {
......
......@@ -15,11 +15,13 @@
package rpcserver
import (
"github.com/matrixorigin/matrixone/pkg/logutil"
"os"
"testing"
"time"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/stretchr/testify/require"
"github.com/matrixorigin/matrixone/pkg/logger"
"github.com/matrixorigin/matrixone/pkg/rpcserver/message"
......@@ -39,7 +41,8 @@ func TestServer(t *testing.T) {
}
h := new(hello)
h.cmd = srv.Register(h.process)
srv.Run()
err = srv.Run()
require.NoError(t, err)
time.Sleep(10 * time.Second)
}
......
......@@ -123,6 +123,7 @@ func newTestCase(gm *guest.Mmu) connectorTestCase {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -166,6 +166,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, poses []int32, ag
// create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -273,6 +273,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, ts []types.Type, rp []ResultPos, c
// create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -153,6 +153,7 @@ func BenchmarkLimit(b *testing.B) {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -200,6 +200,7 @@ func newTestCase(m *mheap.Mheap, flgs []bool, needEval bool, ts []types.Type) gr
// create a new block based on the type information, flgs[i] == ture: has null
func newBatch(t *testing.T, flgs []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -172,6 +172,7 @@ func newTestCase(m *mheap.Mheap, limit uint64) limitTestCase {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -176,6 +176,7 @@ func newTestCase(m *mheap.Mheap, offset uint64) offsetTestCase {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -166,6 +166,7 @@ func newTestCase(m *mheap.Mheap, ds []bool, ts []types.Type, fs []order.Field) o
// create a new block based on the type information, ds[i] == true: in descending order
func newBatch(t *testing.T, ds []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
flg := ds[i]
......
......@@ -168,6 +168,7 @@ func newTestCase(m *mheap.Mheap, ds []bool, ts []types.Type, limit int64, fs []t
// create a new block based on the type information, ds[i] == true: in descending order
func newBatch(t *testing.T, ds []bool, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
flg := ds[i]
......
......@@ -153,6 +153,7 @@ func BenchmarkOffset(b *testing.B) {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
......@@ -135,6 +135,7 @@ func newTestCase(m *mheap.Mheap, ts []types.Type, fs []Field) orderTestCase {
// create a new block based on the type information
func newBatch(t *testing.T, ts []types.Type, proc *process.Process, rows int64) *batch.Batch {
bat := batch.New(len(ts))
bat.Cnt = 1
bat.InitZsOne(int(rows))
for i := range bat.Vecs {
vec := vector.New(ts[i])
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment