| // Copyright 2020 Google LLC |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // https://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| |
| package test |
| |
| import ( |
| "fmt" |
| "strconv" |
| "strings" |
| "sync" |
| ) |
| |
| // OrderingSender generates strings containing a message index to use for |
| // verifying message ordering. It is used on conjunction with Publishers. |
| type OrderingSender struct { |
| TotalMsgCount int64 |
| } |
| |
| // NewOrderingSender creats a new OrderingSender. |
| func NewOrderingSender() *OrderingSender { |
| return new(OrderingSender) |
| } |
| |
| // Next generates the next string to publish. |
| func (os *OrderingSender) Next(prefix string) string { |
| os.TotalMsgCount++ |
| return fmt.Sprintf("%s/%d", prefix, os.TotalMsgCount) |
| } |
| |
| // OrderingReceiver consumes a message string generated by OrderingSender and |
| // verifies that messages in a partition are ordered. It is used in conjunction |
| // with Subscribers. |
| type OrderingReceiver struct { |
| mu sync.Mutex |
| // Map of key and last received message index. Messages are only guaranteed to |
| // be received in order within a partition. |
| received map[string]int64 |
| } |
| |
| // NewOrderingReceiver creates a new OrderingReceiver. |
| func NewOrderingReceiver() *OrderingReceiver { |
| return &OrderingReceiver{ |
| received: make(map[string]int64), |
| } |
| } |
| |
| func parseMsgIndex(msg string) int64 { |
| pos := strings.LastIndex(msg, "/") |
| if pos >= 0 { |
| if n, err := strconv.ParseInt(msg[pos+1:], 10, 64); err == nil { |
| return n |
| } |
| } |
| return -1 |
| } |
| |
| // Receive checks the given message data and key and returns an error if |
| // unordered messages are detected. |
| // |
| // Note: a normal scenario resulting in unordered messages is when the Publish |
| // stream breaks while there are in-flight batches, which are resent upon |
| // stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a |
| // test. |
| func (or *OrderingReceiver) Receive(data, key string) error { |
| or.mu.Lock() |
| defer or.mu.Unlock() |
| |
| idx := parseMsgIndex(data) |
| if idx < 0 { |
| return fmt.Errorf("failed to parse index from message: %q", data) |
| } |
| |
| // Verify increasing ordering. |
| lastIdx, exists := or.received[key] |
| if exists && idx <= lastIdx { |
| return fmt.Errorf("message ordering failed for key %s, expected message idx > %d, got %d", key, lastIdx, idx) |
| } |
| or.received[key] = idx |
| return nil |
| } |
| |
| var void struct{} |
| |
| type msgMetadata struct { |
| offsets map[int64]struct{} |
| } |
| |
| func newMsgMetadata() *msgMetadata { |
| return &msgMetadata{ |
| offsets: make(map[int64]struct{}), |
| } |
| } |
| |
| func (mm *msgMetadata) ContainsOffset(offset int64) bool { |
| _, exists := mm.offsets[offset] |
| return exists |
| } |
| |
| func (mm *msgMetadata) AddOffset(offset int64) { |
| mm.offsets[offset] = void |
| } |
| |
| // DuplicateMsgDetector can be used to detect duplicate messages, either due to |
| // duplicate publishes or receives. |
| type DuplicateMsgDetector struct { |
| mu sync.Mutex |
| // Map of Pub/Sub message data and associated metadata. |
| msgs map[string]*msgMetadata |
| duplicatePublishCount int64 |
| duplicateReceiveCount int64 |
| } |
| |
| // NewDuplicateMsgDetector creates a new DuplicateMsgDetector. |
| func NewDuplicateMsgDetector() *DuplicateMsgDetector { |
| return &DuplicateMsgDetector{ |
| msgs: make(map[string]*msgMetadata), |
| } |
| } |
| |
| // Receive checks the given message data and offset. |
| func (dm *DuplicateMsgDetector) Receive(data string, offset int64) { |
| dm.mu.Lock() |
| defer dm.mu.Unlock() |
| |
| if metadata, exists := dm.msgs[data]; exists { |
| if metadata.ContainsOffset(offset) { |
| // If the message contains the same offset, it means it was received |
| // multiple times. This is not expected within a single test run. But it |
| // is normal when processes are stopped & restarted without committing |
| // cursors. |
| dm.duplicateReceiveCount++ |
| } else { |
| // If the message contains a different offset, it means a message was |
| // republished, which can occur when a publish stream reconnects with |
| // in-flight published messages. |
| dm.duplicatePublishCount++ |
| metadata.AddOffset(offset) |
| } |
| } else { |
| metadata = newMsgMetadata() |
| metadata.AddOffset(offset) |
| dm.msgs[data] = metadata |
| } |
| } |
| |
| // Status returns a non-empty status string if there were duplicates detected. |
| func (dm *DuplicateMsgDetector) Status() string { |
| dm.mu.Lock() |
| defer dm.mu.Unlock() |
| |
| if (dm.duplicateReceiveCount + dm.duplicatePublishCount) == 0 { |
| return "" |
| } |
| return fmt.Sprintf("duplicate publish count = %d, receive count = %d", dm.duplicatePublishCount, dm.duplicateReceiveCount) |
| } |
| |
| // HasPublishDuplicates returns true if duplicate published messages were |
| // detected. |
| func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool { |
| dm.mu.Lock() |
| defer dm.mu.Unlock() |
| return dm.duplicatePublishCount > 0 |
| } |
| |
| // HasReceiveDuplicates returns true if duplicate received messages were |
| // detected. |
| func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool { |
| dm.mu.Lock() |
| defer dm.mu.Unlock() |
| return dm.duplicateReceiveCount > 0 |
| } |