feat(bigtable/cbt): cbt 'import' cmd to parse a .csv file and write to CBT (#5072)

Co-authored-by: Christopher Wilcox <crwilcox@google.com>
diff --git a/bigtable/cmd/cbt/cbt.go b/bigtable/cmd/cbt/cbt.go
old mode 100644
new mode 100755
index a68ddc2..e69d466
--- a/bigtable/cmd/cbt/cbt.go
+++ b/bigtable/cmd/cbt/cbt.go
@@ -32,6 +32,7 @@
 	"sort"
 	"strconv"
 	"strings"
+	"sync"
 	"text/tabwriter"
 	"text/template"
 	"time"
@@ -410,6 +411,23 @@
 		Required: cbtconfig.NoneRequired,
 	},
 	{
+		Name: "import",
+		Desc: "Batch write many rows based on the input file",
+		do:   doImport,
+		Usage: "cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]\n" +
+			"  app-profile=<app-profile-id>          The app profile ID to use for the request\n" +
+			"  column-family=<family-name>           The column family label to use\n" +
+			"  batch-size=<500>                      The max number of rows per batch write request\n" +
+			"  workers=<1>                           The number of worker threads\n\n" +
+			"  Import data from a csv file into an existing cbt table that has the required column families.\n" +
+			"  See <example.csv.github.com/cbt-import-sample.csv> for a sample .csv file and formatting.\n" +
+			"  If no column family row is present, use the column-family flag to specify an existing family.\n\n" +
+			"  Examples:\n" +
+			"    cbt import csv-import-table cbt-import-sample.csv\n" +
+			"    cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5\n",
+		Required: cbtconfig.ProjectAndInstanceRequired,
+	},
+	{
 		Name:     "listinstances",
 		Desc:     "List instances in a project",
 		do:       doListInstances,
@@ -1534,6 +1552,187 @@
 	}
 }
 
+type importerArgs struct {
+	appProfile string
+	fam        string
+	sz         int
+	workers    int
+}
+
+type safeReader struct {
+	mu sync.Mutex
+	r  *csv.Reader
+	t  int // total rows
+}
+
+func doImport(ctx context.Context, args ...string) {
+	ia, err := parseImporterArgs(ctx, args)
+	if err != nil {
+		log.Fatalf("error parsing importer args: %s", err)
+	}
+	f, err := os.Open(args[1])
+	if err != nil {
+		log.Fatalf("couldn't open the csv file: %s", err)
+	}
+
+	tbl := getClient(bigtable.ClientConfig{AppProfile: ia.appProfile}).Open(args[0])
+	r := csv.NewReader(f)
+	importCSV(ctx, tbl, r, ia)
+}
+
+func parseImporterArgs(ctx context.Context, args []string) (importerArgs, error) {
+	var err error
+	ia := importerArgs{
+		fam:     "",
+		sz:      500,
+		workers: 1,
+	}
+	if len(args) < 2 {
+		return ia, fmt.Errorf("usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]")
+	}
+	for _, arg := range args[2:] {
+		switch {
+		case strings.HasPrefix(arg, "app-profile="):
+			ia.appProfile = strings.Split(arg, "=")[1]
+		case strings.HasPrefix(arg, "column-family="):
+			ia.fam = strings.Split(arg, "=")[1]
+			if ia.fam == "" {
+				return ia, fmt.Errorf("column-family cannot be ''")
+			}
+		case strings.HasPrefix(arg, "batch-size="):
+			ia.sz, err = strconv.Atoi(strings.Split(arg, "=")[1])
+			if err != nil || ia.sz <= 0 || ia.sz >= 100000 {
+				return ia, fmt.Errorf("batch-size must be > 0 and <= 100000")
+			}
+		case strings.HasPrefix(arg, "workers="):
+			ia.workers, err = strconv.Atoi(strings.Split(arg, "=")[1])
+			if err != nil || ia.workers <= 0 {
+				return ia, fmt.Errorf("workers must be > 0, err:%s", err)
+			}
+		}
+	}
+	return ia, nil
+}
+
+func importCSV(ctx context.Context, tbl *bigtable.Table, r *csv.Reader, ia importerArgs) {
+	fams, cols, err := parseCsvHeaders(r, ia.fam)
+	if err != nil {
+		log.Fatalf("error parsing headers: %s", err)
+	}
+	sr := safeReader{r: r}
+	ts := bigtable.Now()
+
+	var wg sync.WaitGroup
+	wg.Add(ia.workers)
+	for i := 0; i < ia.workers; i++ {
+		go func(w int) {
+			defer wg.Done()
+			if e := sr.parseAndWrite(ctx, tbl, fams, cols, ts, ia.sz, w); e != nil {
+				log.Fatalf("error: %s", e)
+			}
+		}(i)
+	}
+	wg.Wait()
+	log.Printf("Done importing %d rows.\n", sr.t)
+}
+
+func parseCsvHeaders(r *csv.Reader, family string) ([]string, []string, error) {
+	var err error
+	var fams, cols []string
+	if family == "" { // no column-family from flag, using first row
+		fams, err = r.Read()
+		if err != nil {
+			return nil, nil, fmt.Errorf("family header reader error:%s", err)
+		}
+	}
+	cols, err = r.Read() // column names are next row
+	if err != nil {
+		return nil, nil, fmt.Errorf("columns header reader error:%s", err)
+	}
+	if family != "" {
+		fams = make([]string, len(cols))
+		fams[1] = family
+	}
+	if len(fams) < 2 || len(cols) < 2 {
+		return fams, cols, fmt.Errorf("at least 2 columns are required (rowkey + data)")
+	}
+	if fams[0] != "" || cols[0] != "" {
+		return fams, cols, fmt.Errorf("the first column must be empty for column-family and column name rows")
+	}
+	if fams[1] == "" || cols[1] == "" {
+		return fams, cols, fmt.Errorf("the second column (first data column) must have values for column family and column name rows if present")
+	}
+	for i := range cols { // fill any blank column families with previous
+		if i > 0 && fams[i] == "" {
+			fams[i] = fams[i-1]
+		}
+	}
+	return fams, cols, nil
+}
+
+func batchWrite(ctx context.Context, tbl *bigtable.Table, rk []string, muts []*bigtable.Mutation, worker int) (int, error) {
+	log.Printf("[%d] Writing batch:: size: %d, firstRowKey: %s, lastRowKey: %s\n", worker, len(rk), rk[0], rk[len(rk)-1])
+	errors, err := tbl.ApplyBulk(ctx, rk, muts)
+	if err != nil {
+		return 0, fmt.Errorf("applying bulk mutations process error: %v", err)
+	}
+	if errors != nil {
+		return 0, fmt.Errorf("applying bulk mutations had %d errors, first:%v", len(errors), errors[0])
+
+	}
+	return len(rk), nil
+}
+
+func (sr *safeReader) parseAndWrite(ctx context.Context, tbl *bigtable.Table, fams, cols []string, ts bigtable.Timestamp, max, worker int) error {
+	var rowKey []string
+	var muts []*bigtable.Mutation
+	var c int
+	for {
+		sr.mu.Lock()
+		for len(rowKey) < max {
+			line, err := sr.r.Read()
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				log.Fatal(err)
+			}
+			mut := bigtable.NewMutation()
+			empty := true
+			for i, val := range line {
+				if i > 0 && val != "" {
+					mut.Set(fams[i], cols[i], ts, []byte(val))
+					empty = false
+				}
+			}
+			if empty {
+				log.Printf("[%d] RowKey '%s' has no mutations, skipping", worker, line[0])
+				continue
+			}
+			if line[0] == "" {
+				log.Printf("[%d] RowKey not present, skipping line", worker)
+				continue
+			}
+			rowKey = append(rowKey, line[0])
+			muts = append(muts, mut)
+		}
+		if len(rowKey) > 0 {
+			sr.mu.Unlock()
+			n, err := batchWrite(ctx, tbl, rowKey, muts, worker)
+			if err != nil {
+				return err
+			}
+			c += n
+			rowKey = rowKey[:0]
+			muts = muts[:0]
+			continue
+		}
+		sr.t += c
+		sr.mu.Unlock()
+		return nil
+	}
+}
+
 // parseDuration parses a duration string.
 // It is similar to Go's time.ParseDuration, except with a different set of supported units,
 // and only simple formats supported.
diff --git a/bigtable/cmd/cbt/cbt_test.go b/bigtable/cmd/cbt/cbt_test.go
index e7c6ea2..fa8e401 100644
--- a/bigtable/cmd/cbt/cbt_test.go
+++ b/bigtable/cmd/cbt/cbt_test.go
@@ -15,12 +15,20 @@
 package main
 
 import (
+	"bytes"
+	"context"
+	"encoding/csv"
+	"fmt"
+	"strings"
 	"testing"
 	"time"
 
 	"cloud.google.com/go/bigtable"
+	"cloud.google.com/go/bigtable/bttest"
 	"cloud.google.com/go/internal/testutil"
 	"github.com/google/go-cmp/cmp"
+	"google.golang.org/api/option"
+	"google.golang.org/grpc"
 )
 
 func TestParseDuration(t *testing.T) {
@@ -171,3 +179,408 @@
 		}
 	}
 }
+
+// Check if we get a substring of the expected error.
+// Returns "" if so, else returns the expected substring and error.
+func matchesExpectedError(want string, err error) string {
+	if err != nil {
+		got := err.Error()
+		if want == "" || !strings.Contains(got, want) {
+			return fmt.Sprintf("expected error substr:%s, got:%s", want, got)
+		}
+	} else if want != "" {
+		return fmt.Sprintf("expected error substr:%s", want)
+	}
+	return ""
+}
+
+func TestCsvImporterArgs(t *testing.T) {
+	tests := []struct {
+		in  []string
+		out importerArgs
+		err string
+	}{
+		{in: []string{"my-table", "my-file.csv"}, out: importerArgs{"", "", 500, 1}},
+		{in: []string{"my-table", "my-file.csv", "app-profile="}, out: importerArgs{"", "", 500, 1}},
+		{in: []string{"my-table", "my-file.csv", "app-profile=my-ap", "column-family=my-family", "batch-size=100", "workers=20"},
+			out: importerArgs{"my-ap", "my-family", 100, 20}},
+
+		{in: []string{}, err: "usage: cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]"},
+		{in: []string{"my-table", "my-file.csv", "column-family="}, err: "column-family cannot be ''"},
+		{in: []string{"my-table", "my-file.csv", "batch-size=-5"}, err: "batch-size must be > 0 and <= 100000"},
+		{in: []string{"my-table", "my-file.csv", "batch-size=5000000"}, err: "batch-size must be > 0 and <= 100000"},
+		{in: []string{"my-table", "my-file.csv", "batch-size=nan"}, err: "batch-size must be > 0 and <= 100000"},
+		{in: []string{"my-table", "my-file.csv", "batch-size="}, err: "batch-size must be > 0 and <= 100000"},
+		{in: []string{"my-table", "my-file.csv", "workers=0"}, err: "workers must be > 0, err:%!s(<nil>)"},
+		{in: []string{"my-table", "my-file.csv", "workers=nan"}, err: "workers must be > 0, err:strconv.Atoi: parsing \"nan\": invalid syntax"},
+		{in: []string{"my-table", "my-file.csv", "workers="}, err: "workers must be > 0, err:strconv.Atoi: parsing \"\": invalid syntax"},
+	}
+	for _, tc := range tests {
+		got, err := parseImporterArgs(context.Background(), tc.in)
+		if e := matchesExpectedError(tc.err, err); e != "" {
+			t.Errorf("%s", e)
+			continue
+		}
+		if tc.err != "" {
+			continue // received expected error, do not parse below
+		}
+		if got.appProfile != tc.out.appProfile ||
+			got.fam != tc.out.fam ||
+			got.sz != tc.out.sz ||
+			got.workers != tc.out.workers {
+			t.Errorf("parseImportArgs(%q) did not fail, out: %q", tc.in, got)
+		}
+	}
+}
+
+func transformToCsvBuffer(data [][]string) ([]byte, error) {
+	if len(data) == 0 {
+		return nil, fmt.Errorf("Data cannot be empty")
+	}
+	var buf bytes.Buffer
+	csvWriter := csv.NewWriter(&buf)
+	if err := csvWriter.WriteAll(data); err != nil {
+		return nil, err
+	}
+	csvWriter.Flush()
+	if err := csvWriter.Error(); err != nil {
+		return nil, err
+	}
+	return buf.Bytes(), nil
+}
+
+func TestCsvHeaderParser(t *testing.T) {
+	tests := []struct {
+		label    string
+		iData    [][]string
+		iFam     string
+		oFams    []string
+		oCols    []string
+		nextLine []string
+		err      string
+	}{
+		{label: "extend-family-gap",
+			iData:    [][]string{{"", "my-family", "", "my-family-2"}, {"", "col-1", "col-2", "col-3"}, {"rk-1", "A", "", ""}},
+			iFam:     "",
+			oFams:    []string{"", "my-family", "my-family", "my-family-2"},
+			oCols:    []string{"", "col-1", "col-2", "col-3"},
+			nextLine: []string{"rk-1", "A", "", ""}},
+		{label: "handle-family-arg",
+			iData:    [][]string{{"", "col-1", "col-2"}, {"rk-1", "A", ""}},
+			iFam:     "arg-family",
+			oFams:    []string{"", "arg-family", "arg-family"},
+			oCols:    []string{"", "col-1", "col-2"},
+			nextLine: []string{"rk-1", "A", ""}},
+
+		{label: "eof-header-family",
+			iData: [][]string{{""}},
+			iFam:  "",
+			err:   "family header reader error:EOF"},
+		{label: "eof-header-column",
+			iData: [][]string{{""}, {""}},
+			iFam:  "arg-family",
+			err:   "columns header reader error:EOF"},
+		{label: "rowkey-in-header-row",
+			iData: [][]string{{"ABC", "my-family", ""}},
+			iFam:  "arg-family",
+			err:   "the first column must be empty for column-family and column name rows"},
+		{label: "blank-first-headers",
+			iData: [][]string{{"", "", ""}},
+			iFam:  "arg-family",
+			err:   "the second column (first data column) must have values for column family and column name rows if present"},
+	}
+
+	for _, tc := range tests {
+		// create in memory csv like file
+		byteData, err := transformToCsvBuffer(tc.iData)
+		if err != nil {
+			t.Fatal(err)
+		}
+		reader := csv.NewReader(bytes.NewReader(byteData))
+
+		fams, cols, err := parseCsvHeaders(reader, tc.iFam)
+		if e := matchesExpectedError(tc.err, err); e != "" {
+			t.Errorf("%s %s", tc.label, e)
+			continue
+		}
+		if tc.err != "" {
+			continue // received expected error, do not parse below
+		}
+
+		line, _ := reader.Read()
+		if err != nil {
+			t.Errorf("Next line for reader error, got: %q, expect: %q, error:%s", line, tc.nextLine, err)
+			continue
+		}
+		if len(fams) != len(tc.oFams) ||
+			len(cols) != len(tc.oCols) ||
+			len(line) != len(tc.nextLine) {
+			t.Errorf("parseCsvHeaders() did not fail, incorrect output sizes found, fams: %d, cols:%d, line:%d", len(fams), len(cols), len(line))
+			continue
+		}
+		for i, f := range fams {
+			if f != tc.oFams[i] {
+				t.Errorf("Incorrect column families idx:%d, got: %q, want %q", i, fams[i], tc.oFams[i])
+				continue
+			}
+		}
+		for i, c := range cols {
+			if c != tc.oCols[i] {
+				t.Errorf("parseCsvHeaders() did not fail for column names idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i])
+				continue
+			}
+		}
+		for i, v := range line {
+			if v != tc.nextLine[i] {
+				t.Errorf("parseCsvHeaders() did not fail for next line idx:%d, got: %q, want %q", i, cols[i], tc.oCols[i])
+				continue
+			}
+		}
+	}
+}
+
+func setupEmulator(t *testing.T, tables, families []string) (context.Context, *bigtable.Client) {
+	srv, err := bttest.NewServer("localhost:0")
+	if err != nil {
+		t.Fatalf("Error starting bttest server: %s", err)
+	}
+
+	ctx := context.Background()
+
+	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
+	if err != nil {
+		t.Fatalf("Error %s", err)
+	}
+
+	proj, instance := "proj", "instance"
+	adminClient, err := bigtable.NewAdminClient(ctx, proj, instance, option.WithGRPCConn(conn))
+	if err != nil {
+		t.Fatalf("Error %s", err)
+	}
+
+	for _, ta := range tables {
+		if err = adminClient.CreateTable(ctx, ta); err != nil {
+			t.Fatalf("Error %s", err)
+		}
+		for _, f := range families {
+			if err = adminClient.CreateColumnFamily(ctx, ta, f); err != nil {
+				t.Fatalf("Error %s", err)
+			}
+		}
+	}
+
+	client, err := bigtable.NewClient(ctx, proj, instance, option.WithGRPCConn(conn))
+	if err != nil {
+		t.Fatalf("Error %s", err)
+	}
+
+	return ctx, client
+}
+
+func validateData(ctx context.Context, tbl *bigtable.Table, fams, cols []string, rowData [][]string) error {
+	// vaildate table entries, valMap["rowkey:family:column"] = mutation value
+	valMap := make(map[string]string)
+	for _, row := range rowData {
+		for i, val := range row {
+			if i > 0 && val != "" {
+				valMap[row[0]+":"+fams[i]+":"+cols[i]] = val
+			}
+		}
+	}
+	for _, data := range rowData {
+		row, err := tbl.ReadRow(ctx, data[0])
+		if err != nil {
+			return err
+		}
+		for _, cf := range row {
+			for _, column := range cf {
+				k := data[0] + ":" + string(column.Column)
+				v, ok := valMap[k]
+				if ok && v == string(column.Value) {
+					delete(valMap, k)
+				}
+			}
+		}
+	}
+	if len(valMap) != 0 {
+		return fmt.Errorf("Data didn't match after read, not found %v", valMap)
+	}
+	return nil
+}
+
+func TestCsvParseAndWrite(t *testing.T) {
+	ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "my-family-2"})
+
+	tbl := client.Open("my-table")
+	fams := []string{"", "my-family", "my-family-2"}
+	cols := []string{"", "col-1", "col-2"}
+	rowData := [][]string{
+		{"rk-0", "A", "B"},
+		{"rk-1", "", "C"},
+	}
+
+	byteData, err := transformToCsvBuffer(rowData)
+	if err != nil {
+		t.Fatal(err)
+	}
+	reader := csv.NewReader(bytes.NewReader(byteData))
+
+	sr := safeReader{r: reader}
+	if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil {
+		t.Fatalf("parseAndWrite() failed unexpectedly, error:%s", err)
+	}
+
+	if err := validateData(ctx, tbl, fams, cols, rowData); err != nil {
+		t.Fatalf("Read back validation error:%s", err)
+	}
+}
+
+func TestCsvParseAndWriteBadFamily(t *testing.T) {
+	ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"})
+
+	tbl := client.Open("my-table")
+	fams := []string{"", "my-family", "not-my-family"}
+	cols := []string{"", "col-1", "col-2"}
+	rowData := [][]string{
+		{"rk-0", "A", "B"},
+		{"rk-1", "", "C"},
+	}
+
+	byteData, err := transformToCsvBuffer(rowData)
+	if err != nil {
+		t.Fatal(err)
+	}
+	reader := csv.NewReader(bytes.NewReader(byteData))
+
+	sr := safeReader{r: reader}
+	if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err == nil {
+		t.Fatalf("parseAndWrite() should have failed with non-existant column family")
+	}
+}
+
+func TestCsvParseAndWriteDuplicateRowkeys(t *testing.T) {
+	ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family"})
+
+	tbl := client.Open("my-table")
+	fams := []string{"", "my-family", "my-family"}
+	cols := []string{"", "col-1", "col-2"}
+	rowData := [][]string{
+		{"rk-0", "A", ""},
+		{"rk-0", "", "B"},
+		{"rk-0", "C", ""},
+	}
+
+	byteData, err := transformToCsvBuffer(rowData)
+	if err != nil {
+		t.Fatal(err)
+	}
+	reader := csv.NewReader(bytes.NewReader(byteData))
+
+	sr := safeReader{r: reader}
+	if err = sr.parseAndWrite(ctx, tbl, fams, cols, 1, 1, 1); err != nil {
+		t.Fatalf("parseAndWrite() should not have failed for duplicate rowkeys: %s", err)
+	}
+
+	// the "A" not present result is expected, the emulator only keeps 1 version
+	valMap := map[string]bool{"rk-0:my-family:col-2:B": true, "rk-0:my-family:col-1:C": true}
+	row, err := tbl.ReadRow(ctx, "rk-0")
+	if err != nil {
+		t.Errorf("error %s", err)
+	}
+	for _, cf := range row { // each column family in row
+		for _, column := range cf { // each cf:column, aka each mutation
+			k := "rk-0:" + string(column.Column) + ":" + string(column.Value)
+			_, ok := valMap[k]
+			if ok {
+				delete(valMap, k)
+				continue
+			}
+			t.Errorf("row data not found for %s\n", k)
+		}
+	}
+
+	if len(valMap) != 0 {
+		t.Fatalf("values were not present in table: %v", valMap)
+	}
+}
+
+func TestCsvToCbt(t *testing.T) {
+	tests := []struct {
+		label        string
+		ia           importerArgs
+		csvData      [][]string
+		expectedFams []string
+		dataStartIdx int
+	}{
+		{
+			label: "has-column-families",
+			ia:    importerArgs{fam: "", sz: 1, workers: 1},
+			csvData: [][]string{
+				{"", "my-family", ""},
+				{"", "col-1", "col-2"},
+				{"rk-0", "A", ""},
+				{"rk-1", "", "B"},
+				{"rk-2", "", ""},
+				{"rk-3", "C", ""},
+			},
+			expectedFams: []string{"", "my-family", "my-family"},
+			dataStartIdx: 2,
+		},
+		{
+			label: "no-column-families",
+			ia:    importerArgs{fam: "arg-family", sz: 1, workers: 1},
+			csvData: [][]string{
+				{"", "col-1", "col-2"},
+				{"rk-0", "A", ""},
+				{"rk-1", "", "B"},
+				{"rk-2", "", ""},
+				{"rk-3", "C", "D"},
+			},
+			expectedFams: []string{"", "arg-family", "arg-family"},
+			dataStartIdx: 1,
+		},
+		{
+			label: "larger-batches",
+			ia:    importerArgs{fam: "arg-family", sz: 100, workers: 1},
+			csvData: [][]string{
+				{"", "col-1", "col-2"},
+				{"rk-0", "A", ""},
+				{"rk-1", "", "B"},
+				{"rk-2", "", ""},
+				{"rk-3", "C", "D"},
+			},
+			expectedFams: []string{"", "arg-family", "arg-family"},
+			dataStartIdx: 1,
+		},
+		{
+			label: "many-workers",
+			ia:    importerArgs{fam: "arg-family", sz: 1, workers: 20},
+			csvData: [][]string{
+				{"", "col-1", "col-2"},
+				{"rk-0", "A", ""},
+				{"rk-1", "", "B"},
+				{"rk-2", "", ""},
+				{"rk-3", "C", "D"},
+			},
+			expectedFams: []string{"", "arg-family", "arg-family"},
+			dataStartIdx: 1,
+		},
+	}
+
+	for _, tc := range tests {
+		ctx, client := setupEmulator(t, []string{"my-table"}, []string{"my-family", "arg-family"})
+		tbl := client.Open("my-table")
+
+		byteData, err := transformToCsvBuffer(tc.csvData)
+		if err != nil {
+			t.Fatal(err)
+		}
+		reader := csv.NewReader(bytes.NewReader(byteData))
+
+		importCSV(ctx, tbl, reader, tc.ia)
+
+		if err := validateData(ctx, tbl, tc.expectedFams, tc.csvData[tc.dataStartIdx-1], tc.csvData[tc.dataStartIdx:]); err != nil {
+			t.Fatalf("Read back validation error: %s", err)
+		}
+	}
+}
diff --git a/bigtable/cmd/cbt/cbtdoc.go b/bigtable/cmd/cbt/cbtdoc.go
index 4dc0f9b..eb700f4 100644
--- a/bigtable/cmd/cbt/cbtdoc.go
+++ b/bigtable/cmd/cbt/cbtdoc.go
@@ -42,6 +42,7 @@
     deletetable               Delete a table
     doc                       Print godoc-suitable documentation for cbt
     help                      Print help text
+    import                    Batch write many rows based on the input file
     listinstances             List instances in a project
     listclusters              List clusters in an instance
     lookup                    Read from a single row
@@ -274,6 +275,27 @@
 
 
 
+Batch write many rows based on the input file
+
+Usage:
+	cbt import <table-id> <input-file> [app-profile=<app-profile-id>] [column-family=<family-name>] [batch-size=<500>] [workers=<1>]
+	  app-profile=<app-profile-id>          The app profile ID to use for the request
+	  column-family=<family-name>           The column family label to use
+	  batch-size=<500>                      The max number of rows per batch write request
+	  workers=<1>                           The number of worker threads
+
+	  Import data from a csv file into an existing cbt table that has the required column families.
+	  See <example.csv.github.com/cbt-import-sample.csv> for a sample .csv file and formatting.
+	  If no column family row is present, use the column-family flag to specify an existing family.
+
+	  Examples:
+	    cbt import csv-import-table cbt-import-sample.csv
+	    cbt import csv-import-table cbt-import-sample.csv app-profile=batch-write-profile column-family=my-family workers=5
+
+
+
+
+
 List instances in a project
 
 Usage: