feat(bigquery/storage/managedwriter): improve method parity in managedwriter (#5007)

This PR exposes the raw methods for creating and committing streams to the wrapped managedwriter client.

It allows users to interact with all the methods of the underlying API using the managedwriter client (which itself wraps the raw v1 client).  The disadvantage is that it couples managedwriter directly to v1, as it accepts requests in the v1 namespace. The existing append interactions all use abstractions local to the managedwriter.

PR also gets rid of the utility method for batch committing write streams; there's not a great deal of utility saved here vs the underlying method.

Towards: https://github.com/googleapis/google-cloud-go/issues/4366
diff --git a/bigquery/storage/managedwriter/client.go b/bigquery/storage/managedwriter/client.go
index 51705f6..7ce771a 100644
--- a/bigquery/storage/managedwriter/client.go
+++ b/bigquery/storage/managedwriter/client.go
@@ -177,28 +177,24 @@
 	return nil
 }
 
-// BatchCommit is used to commit one or more PendingStream streams belonging to the same table
-// as a single transaction.  Streams must be finalized before committing.
+// BatchCommitWriteStreams atomically commits a group of PENDING streams that belong to the same
+// parent table.
 //
-// Format of the parentTable is: projects/{project}/datasets/{dataset}/tables/{table} and the utility
-// function TableParentFromStreamName can be used to derive this from a Stream's name.
-//
-// If the returned response contains stream errors, this indicates that the batch commit failed and no data was
-// committed.
-//
-// TODO: currently returns the raw response.  Determine how we want to surface StreamErrors.
-func (c *Client) BatchCommit(ctx context.Context, parentTable string, streamNames []string) (*storagepb.BatchCommitWriteStreamsResponse, error) {
+// Streams must be finalized before commit and cannot be committed multiple
+// times. Once a stream is committed, data in the stream becomes available
+// for read operations.
+func (c *Client) BatchCommitWriteStreams(ctx context.Context, req *storagepb.BatchCommitWriteStreamsRequest, opts ...gax.CallOption) (*storagepb.BatchCommitWriteStreamsResponse, error) {
+	return c.rawClient.BatchCommitWriteStreams(ctx, req, opts...)
+}
 
-	// determine table from first streamName, as all must share the same table.
-	if len(streamNames) <= 0 {
-		return nil, fmt.Errorf("no streamnames provided")
-	}
-
-	req := &storagepb.BatchCommitWriteStreamsRequest{
-		Parent:       TableParentFromStreamName(streamNames[0]),
-		WriteStreams: streamNames,
-	}
-	return c.rawClient.BatchCommitWriteStreams(ctx, req)
+// CreateWriteStream creates a write stream to the given table.
+// Additionally, every table has a special stream named ‘_default’
+// to which data can be written. This stream doesn’t need to be created using
+// CreateWriteStream. It is a stream that can be used simultaneously by any
+// number of clients. Data written to this stream is considered committed as
+// soon as an acknowledgement is received.
+func (c *Client) CreateWriteStream(ctx context.Context, req *storagepb.CreateWriteStreamRequest, opts ...gax.CallOption) (*storagepb.WriteStream, error) {
+	return c.rawClient.CreateWriteStream(ctx, req, opts...)
 }
 
 // getWriteStream returns information about a given write stream.
diff --git a/bigquery/storage/managedwriter/integration_test.go b/bigquery/storage/managedwriter/integration_test.go
index 7f047de..d0ebca0 100644
--- a/bigquery/storage/managedwriter/integration_test.go
+++ b/bigquery/storage/managedwriter/integration_test.go
@@ -28,6 +28,7 @@
 	"cloud.google.com/go/internal/uid"
 	"go.opencensus.io/stats/view"
 	"google.golang.org/api/option"
+	storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1"
 	"google.golang.org/protobuf/encoding/protojson"
 	"google.golang.org/protobuf/proto"
 	"google.golang.org/protobuf/reflect/protodesc"
@@ -415,7 +416,12 @@
 	}
 
 	// Commit stream and validate.
-	resp, err := mwClient.BatchCommit(ctx, TableParentFromStreamName(ms.StreamName()), []string{ms.StreamName()})
+	req := &storagepb.BatchCommitWriteStreamsRequest{
+		Parent:       TableParentFromStreamName(ms.StreamName()),
+		WriteStreams: []string{ms.StreamName()},
+	}
+
+	resp, err := mwClient.BatchCommitWriteStreams(ctx, req)
 	if err != nil {
 		t.Errorf("client.BatchCommit: %v", err)
 	}