diff --git a/pkg/sql/plan2/function/builtin/unary/space.go b/pkg/sql/plan2/function/builtin/unary/space.go
index 515715ab62f4b8e51d1a247890bf3563c6ce9783..5d320d313166a781afd72f0341ce6de7ce474bf8 100644
--- a/pkg/sql/plan2/function/builtin/unary/space.go
+++ b/pkg/sql/plan2/function/builtin/unary/space.go
@@ -23,6 +23,7 @@ import (
 	"golang.org/x/exp/constraints"
 )
 
+// the function registeration for generics functions may have some problem now, change this to generics later
 func SpaceInt64(vectors []*vector.Vector, proc *process.Process) (*vector.Vector, error) {
 	inputVector := vectors[0]
 	inputValues := inputVector.Col.([]int64)
@@ -32,15 +33,18 @@ func SpaceInt64(vectors []*vector.Vector, proc *process.Process) (*vector.Vector
 			return proc.AllocScalarNullVector(resultType), nil
 		}
 		resultVector := vector.NewConst(resultType)
+		bytesNeed := space.CountSpacesSigned(inputValues)
 		results := &types.Bytes{
-			Data:    make([]byte, inputValues[0]),
+			Data:    make([]byte, bytesNeed),
 			Offsets: make([]uint32, 1),
 			Lengths: make([]uint32, 1),
 		}
-		vector.SetCol(resultVector, space.FillSpacesInt64(inputValues, results))
+		result := space.FillSpacesSigned[int64](inputValues, results)
+		nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+		vector.SetCol(resultVector, result.Result)
 		return resultVector, nil
 	}
-	bytesNeed := space.CountSpacesForInt64(inputValues)
+	bytesNeed := space.CountSpacesSigned(inputValues)
 	resultVector, err := proc.AllocVector(resultType, bytesNeed)
 	if err != nil {
 		return nil, err
@@ -50,10 +54,10 @@ func SpaceInt64(vectors []*vector.Vector, proc *process.Process) (*vector.Vector
 		Offsets: make([]uint32, len(inputValues)),
 		Lengths: make([]uint32, len(inputValues)),
 	}
-	nulls.Set(resultVector.Nsp, inputVector.Nsp)
-	vector.SetCol(resultVector, space.FillSpacesInt64(inputValues, resultValues))
+	result := space.FillSpacesSigned[int64](inputValues, resultValues)
+	nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+	vector.SetCol(resultVector, result.Result)
 	return resultVector, nil
-
 }
 
 func SpaceUint64(vectors []*vector.Vector, proc *process.Process) (*vector.Vector, error) {
@@ -65,15 +69,18 @@ func SpaceUint64(vectors []*vector.Vector, proc *process.Process) (*vector.Vecto
 			return proc.AllocScalarNullVector(resultType), nil
 		}
 		resultVector := vector.NewConst(resultType)
+		bytesNeed := space.CountSpacesUnsigned[uint64](inputValues)
 		results := &types.Bytes{
-			Data:    make([]byte, inputValues[0]),
+			Data:    make([]byte, bytesNeed),
 			Offsets: make([]uint32, 1),
 			Lengths: make([]uint32, 1),
 		}
-		vector.SetCol(resultVector, space.FillSpacesUint64(inputValues, results))
+		result := space.FillSpacesUnsigned[uint64](inputValues, results)
+		nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+		vector.SetCol(resultVector, result.Result)
 		return resultVector, nil
 	}
-	bytesNeed := space.CountSpacesForUint64(inputValues)
+	bytesNeed := space.CountSpacesUnsigned[uint64](inputValues)
 	resultVector, err := proc.AllocVector(resultType, bytesNeed)
 	if err != nil {
 		return nil, err
@@ -83,10 +90,10 @@ func SpaceUint64(vectors []*vector.Vector, proc *process.Process) (*vector.Vecto
 		Offsets: make([]uint32, len(inputValues)),
 		Lengths: make([]uint32, len(inputValues)),
 	}
-	nulls.Set(resultVector.Nsp, inputVector.Nsp)
-	vector.SetCol(resultVector, space.FillSpacesUint64(inputValues, resultValues))
+	result := space.FillSpacesUnsigned[uint64](inputValues, resultValues)
+	nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+	vector.SetCol(resultVector, result.Result)
 	return resultVector, nil
-
 }
 
 func SpaceFloat[T constraints.Float](vectors []*vector.Vector, proc *process.Process) (*vector.Vector, error) {
@@ -98,15 +105,18 @@ func SpaceFloat[T constraints.Float](vectors []*vector.Vector, proc *process.Pro
 			return proc.AllocScalarNullVector(resultType), nil
 		}
 		resultVector := vector.NewConst(resultType)
+		bytesNeed := space.CountSpacesFloat[T](inputValues)
 		results := &types.Bytes{
-			Data:    make([]byte, int(inputValues[0]+1)),
+			Data:    make([]byte, bytesNeed),
 			Offsets: make([]uint32, 1),
 			Lengths: make([]uint32, 1),
 		}
-		vector.SetCol(resultVector, space.FillSpacesFloat[T](inputValues, results))
+		result := space.FillSpacesFloat[T](inputValues, results)
+		nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+		vector.SetCol(resultVector, result.Result)
 		return resultVector, nil
 	}
-	bytesNeed := space.CountSpacesForFloat[T](inputValues)
+	bytesNeed := space.CountSpacesFloat[T](inputValues)
 	resultVector, err := proc.AllocVector(resultType, bytesNeed)
 	if err != nil {
 		return nil, err
@@ -116,8 +126,8 @@ func SpaceFloat[T constraints.Float](vectors []*vector.Vector, proc *process.Pro
 		Offsets: make([]uint32, len(inputValues)),
 		Lengths: make([]uint32, len(inputValues)),
 	}
-	nulls.Set(resultVector.Nsp, inputVector.Nsp)
-	vector.SetCol(resultVector, space.FillSpacesFloat[T](inputValues, resultValues))
+	result := space.FillSpacesFloat[T](inputValues, resultValues)
+	nulls.Or(inputVector.Nsp, result.Nsp, resultVector.Nsp)
+	vector.SetCol(resultVector, result.Result)
 	return resultVector, nil
-
 }
diff --git a/pkg/sql/plan2/function/builtin/unary/space_test.go b/pkg/sql/plan2/function/builtin/unary/space_test.go
new file mode 100644
index 0000000000000000000000000000000000000000..4e3a9fbf9ba85c40737583171f3bc50805eb2dc3
--- /dev/null
+++ b/pkg/sql/plan2/function/builtin/unary/space_test.go
@@ -0,0 +1,193 @@
+// Copyright 2022 Matrix Origin
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//      http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package unary
+
+import (
+	"fmt"
+	"github.com/matrixorigin/matrixone/pkg/container/nulls"
+	"github.com/matrixorigin/matrixone/pkg/container/types"
+	"github.com/matrixorigin/matrixone/pkg/container/vector"
+	"github.com/matrixorigin/matrixone/pkg/vm/mheap"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/guest"
+	"github.com/matrixorigin/matrixone/pkg/vm/mmu/host"
+	"github.com/matrixorigin/matrixone/pkg/vm/process"
+	"github.com/stretchr/testify/require"
+	"testing"
+)
+
+/*
+type TestCase struct {
+	Function        func([]*vector.Vector, *process.Process) (*vector.Vector, error)
+	InputVectors    []*vector.Vector
+	ExpectedVectors []*vector.Vector
+	ExpectError     bool
+}
+
+func RunTestCaseInteger[outputT constraints.Integer](testCases []TestCase) error {
+	mmu := host.New(1024 * 1024)
+	gm := guest.New(1024*1024, mmu)
+	mp := mheap.New(gm)
+	proc := process.New(mp)
+	for i, testCase := range testCases {
+		resultVector, err := testCase.Function(testCase.InputVectors, proc)
+		if err != nil {
+			if testCase.ExpectError {
+				break
+			} else {
+				return fmt.Errorf("unexpected error for vector %d", i)
+			}
+		}
+
+		resultValues := resultVector.Col.([]outputT)
+		expectedVector := testCase.ExpectedVectors[i]
+		expectedValues := expectedVector.Col.([]outputT)
+		for j, resultValue := range resultValues {
+			if resultValue != expectedValues[j] {
+				return fmt.Errorf("wrong result for vector %d", i)
+			}
+		}
+	}
+	return nil
+}
+*/
+
+func NewTestProc() *process.Process {
+	mmu := host.New(1024 * 1024)
+	gm := guest.New(1024*1024, mmu)
+	mp := mheap.New(gm)
+	proc := process.New(mp)
+	return proc
+}
+
+/*
+func RunTestCaseChar(testCases []TestCase) error {
+	proc := NewTestProc()
+	for i, testCase := range testCases {
+		resultVector, err := testCase.Function(testCase.InputVectors, proc)
+		if err != nil {
+			if testCase.ExpectError {
+				break
+			} else {
+				return fmt.Errorf("unexpected error for vector %d", i)
+			}
+		}
+
+		resultValues := resultVector.Col.(*types.Bytes)
+		expectedVector := testCase.ExpectedVectors[i]
+		expectedValues := expectedVector.Col.(*types.Bytes)
+		for j := 0; j < len(resultValues.Lengths)-1; j++ {
+			fmt.Println("...", resultValues.Offsets[j], string(resultValues.Data[resultValues.Offsets[j]:resultValues.Offsets[j]+resultValues.Lengths[j]]))
+			expectedNull := nulls.Contains(expectedVector.Nsp, uint64(j))
+			if expectedNull {
+				if !nulls.Contains(resultVector.Nsp, uint64(j)) {
+					return fmt.Errorf("wrong result for vector %d", i)
+				}
+			} else {
+				if reflect.DeepEqual(resultValues.Data[resultValues.Offsets[j]:resultValues.Offsets[j]+resultValues.Lengths[j]], expectedValues.Data[expectedValues.Offsets[j]:expectedValues.Offsets[j]+expectedValues.Lengths[j]]) {
+					return fmt.Errorf("wrong result for vector %d", i)
+				}
+			}
+		}
+	}
+	return nil
+}
+
+*/
+
+func makeInt64Vector(values []int64, nsp []uint64) *vector.Vector {
+	vec := vector.New(types.Type{Oid: types.T_int64})
+	vec.Col = values
+	for _, n := range nsp {
+		nulls.Add(vec.Nsp, n)
+	}
+	return vec
+}
+
+func makeUint64Vector(values []uint64, nsp []uint64) *vector.Vector {
+	vec := vector.New(types.Type{Oid: types.T_uint64})
+	vec.Col = values
+	for _, n := range nsp {
+		nulls.Add(vec.Nsp, n)
+	}
+	return vec
+}
+
+func makeFloat64Vector(values []float64, nsp []uint64) *vector.Vector {
+	vec := vector.New(types.Type{Oid: types.T_float64})
+	vec.Col = values
+	for _, n := range nsp {
+		nulls.Add(vec.Nsp, n)
+	}
+	return vec
+}
+
+/*
+func makeCharVector(values []string, nsp []uint64) *vector.Vector {
+	vec := vector.New(types.Type{Oid: types.T_char})
+	colValue := new(types.Bytes)
+	colValue.Offsets = make([]uint32, len(values))
+	colValue.Lengths = make([]uint32, len(values))
+	offset := uint32(0)
+	for i, s := range values {
+		lengthS := len(s)
+		colValue.Data = append(colValue.Data, []byte(s)...)
+		colValue.Offsets[i] = offset
+		colValue.Lengths[i] = uint32(lengthS)
+		offset += uint32(lengthS)
+	}
+	for _, n := range nsp {
+		nulls.Add(vec.Nsp, n)
+	}
+	vec.Col = colValue
+	return vec
+}
+
+*/
+
+func TestSpaceUint64(t *testing.T) {
+	inputVector := makeUint64Vector([]uint64{1, 2, 3, 0, 9999999}, []uint64{4})
+	proc := NewTestProc()
+	output, err := SpaceUint64([]*vector.Vector{inputVector}, proc)
+	require.NoError(t, err)
+	result := output.Col.(*types.Bytes)
+	// the correct result should be:
+	// [32 32 32 32 32 32] [1 2 3 0 0] [0 1 3 6 6]
+	fmt.Println(result.Data, result.Lengths, result.Offsets)
+
+}
+
+func TestSpaceInt64(t *testing.T) {
+	inputVector := makeInt64Vector([]int64{1, 2, 3, 0, -1, 9999999}, []uint64{4})
+	proc := NewTestProc()
+	output, err := SpaceInt64([]*vector.Vector{inputVector}, proc)
+	require.NoError(t, err)
+	result := output.Col.(*types.Bytes)
+	// the correct result should be:
+	// [32 32 32 32 32 32] [1 2 3 0 0 0] [0 1 3 6 6 6]
+	fmt.Println(result.Data, result.Lengths, result.Offsets)
+
+}
+
+func TestSpaceFloat64(t *testing.T) {
+	inputVector := makeFloat64Vector([]float64{1.4, 1.6, 3.3, 0, -1, 9999999}, []uint64{4})
+	proc := NewTestProc()
+	output, err := SpaceFloat[float64]([]*vector.Vector{inputVector}, proc)
+	require.NoError(t, err)
+	result := output.Col.(*types.Bytes)
+	// the correct result should be:
+	// [32 32 32 32 32 32] [1 2 3 0 0 0] [0 1 3 6 6 6]
+	fmt.Println(result.Data, result.Lengths, result.Offsets)
+
+}
diff --git a/pkg/vectorize/space/space.go b/pkg/vectorize/space/space.go
index baa4f4de29ada6a261bd8746ccc95114207828af..048a41975c5d65b28112cdb6841d32d55716da64 100644
--- a/pkg/vectorize/space/space.go
+++ b/pkg/vectorize/space/space.go
@@ -12,10 +12,15 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// for input value greater than 8000, function space will output NULL,
+// for input value less than 0, function space will output empty string ""
+// for positive float inputs, function space will round(away from zero)
+
 package space
 
 import (
 	"bytes"
+	"github.com/matrixorigin/matrixone/pkg/container/nulls"
 	"golang.org/x/exp/constraints"
 	"math"
 	"unicode"
@@ -24,6 +29,8 @@ import (
 	"github.com/matrixorigin/matrixone/pkg/vectorize/sum"
 )
 
+var MaxAllowedValue = int64(8000)
+
 func CountSpacesForUnsignedInt(originalVecCol interface{}) int64 {
 	switch col := originalVecCol.(type) {
 	case []uint8:
@@ -60,9 +67,48 @@ func CountSpacesForSignedInt(originalVecCol interface{}) int64 {
 	}
 }
 
-// todo: apply generics for sum, then modify this
-func CountSpacesForInt64(columnValues []int64) int64 {
-	result := sum.Int64Sum(columnValues)
+type Result struct {
+	Result *types.Bytes
+	Nsp    *nulls.Nulls
+}
+
+func CountSpacesSigned[T constraints.Signed](columnValues []T) int64 {
+	result := int64(0)
+	for _, columnValue := range columnValues {
+		if columnValue <= 0 || int64(columnValue) > MaxAllowedValue {
+			continue
+		} else {
+			result += int64(columnValue)
+		}
+	}
+	if result < 0 {
+		return 0
+	} else {
+		return result
+	}
+}
+
+func CountSpacesUnsigned[T constraints.Unsigned](columnValues []T) int64 {
+	result := int64(0)
+	for _, columnValue := range columnValues {
+		if int64(columnValue) > MaxAllowedValue {
+			continue
+		} else {
+			result += int64(columnValue)
+		}
+	}
+	return result
+}
+
+func CountSpacesFloat[T constraints.Float](columnValues []T) int64 {
+	var result int64
+
+	for _, columnValue := range columnValues {
+		if columnValue < 0 || int64(columnValue) > MaxAllowedValue {
+			continue
+		}
+		result += int64(math.Round(float64(columnValue)))
+	}
 	if result < 0 {
 		return 0
 	} else {
@@ -70,10 +116,76 @@ func CountSpacesForInt64(columnValues []int64) int64 {
 	}
 }
 
+func FillSpacesUnsigned[T constraints.Unsigned](originalVecCol []T, resultBytes *types.Bytes) Result {
+	result := Result{Result: resultBytes, Nsp: new(nulls.Nulls)}
+	var offset uint32 = 0
+	for i, length := range originalVecCol {
+		if int64(length) > MaxAllowedValue {
+			resultBytes.Lengths[i] = 0
+			resultBytes.Offsets[i] = offset
+			nulls.Add(result.Nsp, uint64(i))
+		} else {
+			resultBytes.Lengths[i] = uint32(length)
+			resultBytes.Offsets[i] = offset
+			offset += uint32(length)
+		}
+	}
+	for i := range resultBytes.Data {
+		resultBytes.Data[i] = ' '
+	}
+	return result
+}
+
+func FillSpacesSigned[T constraints.Signed](originalVecCol []T, resultBytes *types.Bytes) Result {
+	result := Result{Result: resultBytes, Nsp: new(nulls.Nulls)}
+	var offset uint32 = 0
+	for i, length := range originalVecCol {
+		if length <= 0 {
+			resultBytes.Lengths[i] = 0
+			resultBytes.Offsets[i] = offset
+		} else if int64(length) > MaxAllowedValue {
+			resultBytes.Lengths[i] = 0
+			resultBytes.Offsets[i] = offset
+			nulls.Add(result.Nsp, uint64(i))
+		} else {
+			resultBytes.Lengths[i] = uint32(length) // this cast is guaranteed safe because length > 0
+			resultBytes.Offsets[i] = offset
+			offset += uint32(length)
+		}
+	}
+	for i := range resultBytes.Data {
+		resultBytes.Data[i] = ' '
+	}
+	return result
+}
+
+func FillSpacesFloat[T constraints.Float](originalVecCol []T, resultBytes *types.Bytes) Result {
+	result := Result{Result: resultBytes, Nsp: new(nulls.Nulls)}
+	var offset uint32 = 0
+	for i, length := range originalVecCol {
+		roundLen := math.Round(float64(length))
+		if roundLen <= 0 {
+			resultBytes.Lengths[i] = 0
+			resultBytes.Offsets[i] = offset
+		} else if int64(length) > MaxAllowedValue {
+			resultBytes.Lengths[i] = 0
+			resultBytes.Offsets[i] = offset
+			nulls.Add(result.Nsp, uint64(i))
+		} else {
+			resultBytes.Lengths[i] = uint32(roundLen)
+			resultBytes.Offsets[i] = offset
+			offset += uint32(roundLen)
+		}
+	}
+	for i := range resultBytes.Data {
+		resultBytes.Data[i] = ' '
+	}
+	return result
+}
+
 func CountSpacesForUint64(columnValues []uint64) int64 {
 	return int64(sum.Uint64Sum(columnValues))
 }
-
 func CountSpacesForFloat[T constraints.Float](columnValues []T) int64 {
 	var result int64
 
@@ -113,11 +225,9 @@ func parseStringAsInt64(s string) int64 {
 
 func CountSpacesForCharVarChar(originalVecCol *types.Bytes) int64 {
 	var result int64
-
 	for i, offset := range originalVecCol.Offsets {
 		result += parseStringAsInt64(string(originalVecCol.Data[offset : offset+originalVecCol.Lengths[i]]))
 	}
-
 	return result
 }
 
@@ -263,7 +373,8 @@ func FillSpacesInt64(originalVecCol []int64, result *types.Bytes) *types.Bytes {
 	return result
 }
 
-func FillSpacesInteger[T constraints.Integer](originalVecCol []T, result *types.Bytes) *types.Bytes {
+/*
+func FillSpacesSigned[T constraints.Integer](originalVecCol []T, result *types.Bytes) *types.Bytes {
 	var offset uint32 = 0
 	for i, length := range originalVecCol {
 		result.Lengths[i] = uint32(length)
@@ -278,22 +389,8 @@ func FillSpacesInteger[T constraints.Integer](originalVecCol []T, result *types.
 	return result
 }
 
-func FillSpacesFloat[T constraints.Float](originalVecCol []T, result *types.Bytes) *types.Bytes {
-	var offset uint32 = 0
-	for i, length := range originalVecCol {
-		roundLen := math.Round(float64(length))
-
-		result.Lengths[i] = uint32(roundLen)
-		result.Offsets[i] = offset
-		offset += uint32(roundLen)
-	}
-
-	for i := range result.Data {
-		result.Data[i] = ' '
-	}
+*/
 
-	return result
-}
 func FillSpacesFloat32(originalVecCol []float32, result *types.Bytes) *types.Bytes {
 	var offset uint32 = 0
 	for i, length := range originalVecCol {