blob: 79d2159bec5c27a38f1dac91f514b6f813e061e4 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"fmt"
"math/rand"
"testing"
"cloud.google.com/go/pubsublite/internal/test"
)
func TestRoundRobinMsgRouter(t *testing.T) {
for _, tc := range []struct {
partitionCount int
source int64
want []int
}{
{
partitionCount: 8,
source: 9,
want: []int{1, 2, 3, 4, 5, 6, 7, 0, 1},
},
{
partitionCount: 5,
source: 2,
want: []int{2, 3, 4, 0, 1, 2},
},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
source := &test.FakeSource{Ret: tc.source}
msgRouter := newRoundRobinMsgRouter(rand.New(source), tc.partitionCount)
for i, want := range tc.want {
got := msgRouter.Route([]byte("IGNORED"))
if got != want {
t.Errorf("i=%d: Route() = %d, want = %d", i, got, want)
}
}
})
}
}
func TestHashingMsgRouter(t *testing.T) {
keys := [][]byte{
[]byte("foo1"),
[]byte("foo2"),
[]byte("foo3"),
[]byte("foo4"),
[]byte("foo5"),
}
for _, tc := range []struct {
partitionCount int
}{
{partitionCount: 10},
{partitionCount: 5},
} {
t.Run(fmt.Sprintf("partitionCount=%d", tc.partitionCount), func(t *testing.T) {
msgRouter := newHashingMsgRouter(tc.partitionCount)
for _, key := range keys {
p1 := msgRouter.Route(key)
p2 := msgRouter.Route(key)
if p1 != p2 {
t.Errorf("Route() returned different partitions for same key %v", key)
}
if p1 < 0 || p1 >= tc.partitionCount {
t.Errorf("Route() returned partition out of range: %v", p1)
}
}
})
}
}
type fakeMsgRouter struct {
multiplier int
partitionCount int
}
func (f *fakeMsgRouter) Route(orderingKey []byte) int {
return f.partitionCount * f.multiplier
}
func TestCompositeMsgRouter(t *testing.T) {
for _, tc := range []struct {
desc string
partitionCount int
key []byte
want int
}{
{
desc: "key",
partitionCount: 2,
key: []byte("foo"),
want: 20,
},
{
desc: "nil key",
partitionCount: 8,
key: nil,
want: 800,
},
{
desc: "empty key",
partitionCount: 5,
key: []byte{},
want: 500,
},
} {
t.Run(tc.desc, func(t *testing.T) {
msgRouter := &compositeMsgRouter{
keyedRouter: &fakeMsgRouter{
multiplier: 10,
partitionCount: tc.partitionCount,
},
keylessRouter: &fakeMsgRouter{
multiplier: 100,
partitionCount: tc.partitionCount,
},
}
if got := msgRouter.Route(tc.key); got != tc.want {
t.Errorf("Route() = %d, want = %d", got, tc.want)
}
})
}
}