blob: 623127962ff050c78bbf7d5923a941787e0444c2 [file] [log] [blame]
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
package wire
import (
"testing"
"cloud.google.com/go/pubsublite/internal/test"
)
func emptyAckConsumer(_ *ackConsumer) {
// Nothing to do.
}
func TestAckConsumerAck(t *testing.T) {
numAcks := 0
onAck := func(ac *ackConsumer) {
numAcks++
}
ackConsumer := newAckConsumer(0, 0, onAck)
if got, want := ackConsumer.IsAcked(), false; got != want {
t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
}
// Test duplicate acks.
for i := 0; i < 3; i++ {
ackConsumer.Ack()
if got, want := ackConsumer.IsAcked(), true; got != want {
t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
}
if got, want := numAcks, 1; got != want {
t.Errorf("onAck func called %v times, expected %v call", got, want)
}
}
}
func TestAckConsumerClear(t *testing.T) {
onAck := func(ac *ackConsumer) {
t.Error("onAck func should not have been called")
}
ackConsumer := newAckConsumer(0, 0, onAck)
ackConsumer.Clear()
ackConsumer.Ack()
if got, want := ackConsumer.IsAcked(), true; got != want {
t.Errorf("ackConsumer.IsAcked() got %v, want %v", got, want)
}
}
func TestAckTrackerProcessing(t *testing.T) {
ackTracker := newAckTracker()
// No messages received yet.
if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, emptyAckConsumer)
ack3 := newAckConsumer(3, 0, emptyAckConsumer)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// All messages unacked.
if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
ack1.Ack()
if got, want := ackTracker.CommitOffset(), int64(2); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// Skipped ack2, so the commit offset should not have been updated.
ack3.Ack()
if got, want := ackTracker.CommitOffset(), int64(2); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// Both ack2 and ack3 should be removed from the outstanding acks queue.
ack2.Ack()
if got, want := ackTracker.CommitOffset(), int64(4); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// Newly received message.
ack4 := newAckConsumer(4, 0, emptyAckConsumer)
if err := ackTracker.Push(ack4); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
ack4.Ack()
if got, want := ackTracker.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
if got, want := ackTracker.CommitOffset(), int64(5); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
}
func TestAckTrackerRelease(t *testing.T) {
ackTracker := newAckTracker()
onAckAfterRelease := func(ac *ackConsumer) {
t.Error("onAck should not be called")
}
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, onAckAfterRelease)
ack3 := newAckConsumer(3, 0, onAckAfterRelease)
ack4 := newAckConsumer(4, 0, onAckAfterRelease)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// First ack is called before Release and should be processed.
ack1.Ack()
// After clearing outstanding acks, onAck should not be called.
ackTracker.Release()
ack2.Ack()
ack3.Ack()
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// New acks should be cleared and discarded.
if err := ackTracker.Push(ack4); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
ack4.Ack()
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
if got, want := ackTracker.CommitOffset(), int64(2); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
}
func TestAckTrackerReset(t *testing.T) {
ackTracker := newAckTracker()
onAckAfterReset := func(ac *ackConsumer) {
t.Error("onAck should not be called")
}
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, emptyAckConsumer)
ack3 := newAckConsumer(3, 0, onAckAfterReset)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// Ack tracker should not allow duplicate msg1.
if got, want := ackTracker.Push(ack1), errOutOfOrderMessages; !test.ErrorEqual(got, want) {
t.Errorf("ackTracker.Push() got err %v, want err %v", got, want)
}
// Ack 2 messages to advance the commit offset.
ack1.Ack()
ack2.Ack()
if got, want := ackTracker.CommitOffset(), int64(3); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// Reset should clear the outstanding acks and reset the desired commit
// offset.
ackTracker.Reset()
// Outstanding ack3 should be invalidated.
ack3.Ack()
if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
// After reset, msg1 should be accepted and processed.
ack1 = newAckConsumer(1, 0, emptyAckConsumer)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if got, want := ackTracker.Empty(), false; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
ack1.Ack()
if got, want := ackTracker.CommitOffset(), int64(2); got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
}
func TestCommitCursorTrackerProcessing(t *testing.T) {
ackTracker := newAckTracker()
commitTracker := newCommitCursorTracker(ackTracker)
// No messages received yet.
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), true; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, emptyAckConsumer)
ack3 := newAckConsumer(3, 0, emptyAckConsumer)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// All messages unacked.
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Msg1 acked and commit sent to stream.
ack1.Ack()
if got, want := commitTracker.NextOffset(), int64(2); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Msg 2 & 3 acked commit and sent to stream.
ack2.Ack()
if got, want := commitTracker.NextOffset(), int64(3); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
ack3.Ack()
if got, want := commitTracker.NextOffset(), int64(4); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// First 2 pending commits acknowledged.
if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
}
if err := commitTracker.ConfirmOffsets(2); err != nil {
t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
}
if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), true; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
}
func TestCommitCursorTrackerReset(t *testing.T) {
ackTracker := newAckTracker()
commitTracker := newCommitCursorTracker(ackTracker)
onAckAfterReset := func(ac *ackConsumer) {
t.Error("onAck should not be called")
}
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, emptyAckConsumer)
ack3 := newAckConsumer(3, 0, onAckAfterReset)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// Ack and commit 2 messages.
ack1.Ack()
ack2.Ack()
if got, want := commitTracker.NextOffset(), int64(3); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if err := commitTracker.ConfirmOffsets(1); err != nil {
t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
}
// Reset should clear the ack tracker and commit tracker to their initial
// states.
commitTracker.Reset()
// Outstanding ack3 should be invalidated.
ack3.Ack()
if got, want := ackTracker.CommitOffset(), nilCursorOffset; got != want {
t.Errorf("ackTracker.CommitOffset() got %v, want %v", got, want)
}
if got, want := ackTracker.Empty(), true; got != want {
t.Errorf("ackTracker.Empty() got %v, want %v", got, want)
}
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), true; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// After reset, msg1 should be accepted and processed.
ack1 = newAckConsumer(1, 0, emptyAckConsumer)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
ack1.Ack()
if got, want := commitTracker.NextOffset(), int64(2); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if err := commitTracker.ConfirmOffsets(1); err != nil {
t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
}
}
func TestCommitCursorTrackerStreamReconnects(t *testing.T) {
ackTracker := newAckTracker()
commitTracker := newCommitCursorTracker(ackTracker)
ack1 := newAckConsumer(1, 0, emptyAckConsumer)
ack2 := newAckConsumer(2, 0, emptyAckConsumer)
ack3 := newAckConsumer(3, 0, emptyAckConsumer)
if err := ackTracker.Push(ack1); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack2); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
if err := ackTracker.Push(ack3); err != nil {
t.Errorf("ackTracker.Push() got err %v", err)
}
// All messages unacked.
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Msg1 acked and commit sent to stream.
ack1.Ack()
if got, want := commitTracker.NextOffset(), int64(2); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Msg2 acked and commit sent to stream.
ack2.Ack()
if got, want := commitTracker.NextOffset(), int64(3); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Stream breaks and pending offsets are cleared.
commitTracker.ClearPending()
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// When the stream reconnects the next offset should be 3 (offset 2 skipped).
if got, want := commitTracker.NextOffset(), int64(3); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Msg3 acked and commit sent to stream.
ack3.Ack()
if got, want := commitTracker.NextOffset(), int64(4); got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
commitTracker.AddPending(commitTracker.NextOffset())
if got, want := commitTracker.NextOffset(), nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.NextOffset() got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Only 1 pending commit confirmed.
if got, want := commitTracker.lastConfirmedOffset, nilCursorOffset; got != want {
t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
}
if err := commitTracker.ConfirmOffsets(1); err != nil {
t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
}
if got, want := commitTracker.lastConfirmedOffset, int64(3); got != want {
t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), false; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
// Final pending commit confirmed.
if err := commitTracker.ConfirmOffsets(1); err != nil {
t.Errorf("commitCursorTracker.ConfirmOffsets() got err %v", err)
}
if got, want := commitTracker.lastConfirmedOffset, int64(4); got != want {
t.Errorf("commitCursorTracker.lastConfirmedOffset got %v, want %v", got, want)
}
if got, want := commitTracker.UpToDate(), true; got != want {
t.Errorf("commitCursorTracker.UpToDate() got %v, want %v", got, want)
}
}