Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
adranwit committed Feb 3, 2020
2 parents 14bb36a + 88b294e commit 91d927e
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 28 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
## Jan 8 2020 0.7.4
* Updated adjustDataType with []string, []integer, []float, bytes

## September 11 2019 0.7.3
* Patched column meta data with repeated mode

## August 7 2019 0.7.2
* Update logic for retrying jobs with shared job reference

## August 5 2019 0.7.1
* Added pageSize config parameter to control MaxResult on query interator

## April 16 2019 0.7.0
* Integrated rate limiter
* Minor refactoring
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ For streaming you can specify which column to use as insertId with the following

###### streamBatchCount

streamBatchCount controls row cound in batch (default 9999)
streamBatchCount controls row count in batch (default 9999)

###### insertWaitTimeoutInMs

Expand All @@ -53,6 +53,13 @@ Retries insert when 503 internal error

Default dataset

###### pageSize

Default 500

The maximum number of rows of data to return per page of results.
In addition to this limit, responses are also limited to 10 MB.


## Credentials

Expand Down
6 changes: 5 additions & 1 deletion dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,11 @@ func (d dialect) GetColumns(manager dsc.Manager, datastore, table string) ([]dsc
return nil, fmt.Errorf("table schema was empty %v", table)
}
for _, column := range bqTable.Schema.Fields {
var tableColumn = dsc.NewSimpleColumn(column.Name, column.Type)
columnType := column.Type
if column.Mode == "REPEATED" {
columnType = "[]" + columnType
}
var tableColumn = dsc.NewSimpleColumn(column.Name, columnType)
result = append(result, tableColumn)
}
return result, nil
Expand Down
96 changes: 73 additions & 23 deletions insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type InsertTask struct {
manager dsc.Manager
insertMethod string
insertIdColumn string
maxRetries int
attempts int
columns map[string]dsc.Column
jobReference *bigquery.JobReference
}

//InsertSingle streams single records into big query.
Expand Down Expand Up @@ -113,18 +114,66 @@ func (it *InsertTask) asJSONMap(record interface{}) map[string]bigquery.JsonValu
continue
}
if column, ok := it.columns[k]; ok {
switch strings.ToLower(column.DatabaseTypeName()) {
case "boolean":
val = toolbox.AsBoolean(val)
case "float":
val = toolbox.AsFloat(val)
}
val = it.adjustDataType(column, val)
}
jsonValues[k] = val
}
return jsonValues
}

func (it *InsertTask) adjustDataType(column dsc.Column, val interface{}) interface{} {
switch strings.ToLower(column.DatabaseTypeName()) {
case "[]string":
if ! toolbox.IsSlice(val) {
text := toolbox.AsString(val)
sep := getSeparator(text)
val = strings.Split(strings.TrimSpace(text), sep)
}
case "[]integer":
if ! toolbox.IsSlice(val) {
text := toolbox.AsString(val)
sep := getSeparator(text)
items := strings.Split(strings.TrimSpace(text), sep)
var values = make([]int, 0)
for _, item:= range items {
values = append(values, toolbox.AsInt(item))
}
val = values
}
case "[]float":
if ! toolbox.IsSlice(val) {
text := toolbox.AsString(val)
sep := getSeparator(text)
items := strings.Split(strings.TrimSpace(text), sep)
var values = make([]float64, 0)
for _, item:= range items {
values = append(values, toolbox.AsFloat(item))
}
val = values
}
case "bytes":
bs, ok := val.([]byte)
if ! ok {
bs = []byte(toolbox.AsString(val))
}
val = bs
case "boolean":
val = toolbox.AsBoolean(val)
case "float":
val = toolbox.AsFloat(val)
}
return val
}


func getSeparator(text string) string {
sep := ","
if ! strings.Contains(text, sep) {
sep = " "
}
return sep
}

func (it *InsertTask) asMap(record interface{}) map[string]interface{} {
var jsonValues = make(map[string]interface{})
for k, v := range toolbox.AsMap(record) {
Expand All @@ -133,12 +182,7 @@ func (it *InsertTask) asMap(record interface{}) map[string]interface{} {
continue
}
if column, ok := it.columns[k]; ok {
switch strings.ToLower(column.DatabaseTypeName()) {
case "boolean":
val = toolbox.AsBoolean(val)
case "float":
val = toolbox.AsFloat(val)
}
val = it.adjustDataType(column, val)
}
jsonValues[k] = val
}
Expand Down Expand Up @@ -191,8 +235,10 @@ func (it *InsertTask) LoadAll(data interface{}) (int, error) {
//InsertAll streams all records into big query, returns number records streamed or error.
func (it *InsertTask) Insert(reader io.Reader) error {
req := &bigquery.Job{
JobReference: it.jobReference,
Configuration: &bigquery.JobConfiguration{
Load: &bigquery.JobConfigurationLoad{

SourceFormat: jsonFormat,
DestinationTable: &bigquery.TableReference{
ProjectId: it.projectID,
Expand All @@ -212,12 +258,16 @@ func (it *InsertTask) Insert(reader io.Reader) error {
jobJSON, _ := toolbox.AsIndentJSONText(req)
return fmt.Errorf("failed to submit insert job: %v, %v", jobJSON, err)
}

//store job reference in case internal server error, you may recommit job with the same id to avoid duplication
it.jobReference = job.JobReference
insertWaitTimeMs := it.manager.Config().GetInt(InsertWaitTimeoutInMsKey, defaultInsertWaitTime)
if insertWaitTimeMs <= 0 {
return nil
}
_, err = waitForJobCompletion(it.service, it.context, it.projectID, job.JobReference.JobId, insertWaitTimeMs)
if _, err = waitForJobCompletion(it.service, it.context, it.projectID, job.JobReference.JobId, insertWaitTimeMs); err == nil {
//no error reset job referenc
it.jobReference = nil
}
return err
}

Expand All @@ -237,15 +287,15 @@ func (it *InsertTask) getRowCount() (int, error) {
func (it *InsertTask) streamRows(rows []*bigquery.TableDataInsertAllRequestRows) (*bigquery.TableDataInsertAllResponse, error) {
var response *bigquery.TableDataInsertAllResponse
var err error
for i := 0; i < it.maxRetries; i++ {
for i := 0; i < it.attempts; i++ {
insertRequest := &bigquery.TableDataInsertAllRequest{}
insertRequest.Rows = rows
requestCall := it.service.Tabledata.InsertAll(it.projectID, it.datasetID, it.tableDescriptor.Table, insertRequest)

if response, err = requestCall.Context(it.context).Do(); isInternalServerError(err) {
log.Printf("retrying %v", err)
log.Printf("no insertIdColumn - duplicates expected")
if i+i >= it.maxRetries {
if i+i >= it.attempts {
continue
}
time.Sleep(time.Duration(1+i) * time.Second)
Expand Down Expand Up @@ -365,9 +415,9 @@ func (it *InsertTask) waitForData(estimatedRowCount int) error {
func (it *InsertTask) InsertAll(data interface{}) (int, error) {
var count int
var err error
for i := 0; i < it.maxRetries; i++ {
for i := 0; i < it.attempts; i++ {
if count, err = it.insertAll(data); isInternalServerError(err) {
if i+i >= it.maxRetries {
if i+i >= it.attempts {
continue
}
time.Sleep(time.Duration(i+1) * time.Second)
Expand Down Expand Up @@ -409,14 +459,14 @@ func NewInsertTask(manager dsc.Manager, table *dsc.TableDescriptor, waitForCompl
if _, has := columns[insertIdColumn]; !has {
insertIdColumn = ""
}
maxRetries := config.GetInt(InsertMaxRetires, defaultInsertMaxRetries)
if maxRetries == 0 {
maxRetries = 1
attempts := config.GetInt(InsertMaxRetires, defaultInsertMaxRetries)
if attempts == 0 {
attempts = 1
}
return &InsertTask{
tableDescriptor: table,
service: service,
maxRetries: maxRetries,
attempts: attempts,
context: ctx,
manager: manager,
insertIdColumn: insertIdColumn,
Expand Down
7 changes: 5 additions & 2 deletions query_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

var useLegacySQL = "/* USE LEGACY SQL */"
var queryPageSize int64 = 500
var queryPageSize = 500
var tickInterval = 100
var doneStatus = "DONE"

Expand Down Expand Up @@ -136,7 +136,10 @@ func (qi *QueryIterator) Next() ([]interface{}, error) {

func (qi *QueryIterator) fetchPage() error {
queryResultCall := qi.service.Jobs.GetQueryResults(qi.projectID, qi.jobReferenceID)
queryResultCall.MaxResults(queryPageSize).PageToken(qi.pageToken)

pageSize := qi.manager.Config().GetInt("pageSize", queryPageSize)

queryResultCall.MaxResults(int64(pageSize)).PageToken(qi.pageToken)

jobGetResult, err := queryResultCall.Context(qi.context).Do()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (s *scanner) Columns() ([]string, error) {
}

func (s *scanner) Scan(destinations ...interface{}) error {

if len(destinations) == 1 {
if aMap, ok := destinations[0].(map[string]interface{}); ok {
for i, column := range s.columns {
Expand All @@ -29,7 +30,6 @@ func (s *scanner) Scan(destinations ...interface{}) error {
}
return nil
}

}
for i, dest := range destinations {
value := s.Values[i]
Expand Down

0 comments on commit 91d927e

Please sign in to comment.