Skip to content

Commit

Permalink
added job reference for retries
Browse files Browse the repository at this point in the history
  • Loading branch information
[email protected] authored and [email protected] committed Aug 7, 2019
1 parent 1ade607 commit 38c39cb
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 11 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## August 7 2019 0.7.2
* Update logic for retrying jobs with shared job reference

## August 5 2019 0.7.1
* Added pageSize config parameter to control MaxResult on query interator

Expand Down
29 changes: 18 additions & 11 deletions insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type InsertTask struct {
manager dsc.Manager
insertMethod string
insertIdColumn string
maxRetries int
attempts int
columns map[string]dsc.Column
jobReference *bigquery.JobReference
}

//InsertSingle streams single records into big query.
Expand Down Expand Up @@ -191,8 +192,10 @@ func (it *InsertTask) LoadAll(data interface{}) (int, error) {
//InsertAll streams all records into big query, returns number records streamed or error.
func (it *InsertTask) Insert(reader io.Reader) error {
req := &bigquery.Job{
JobReference: it.jobReference,
Configuration: &bigquery.JobConfiguration{
Load: &bigquery.JobConfigurationLoad{

SourceFormat: jsonFormat,
DestinationTable: &bigquery.TableReference{
ProjectId: it.projectID,
Expand All @@ -212,12 +215,16 @@ func (it *InsertTask) Insert(reader io.Reader) error {
jobJSON, _ := toolbox.AsIndentJSONText(req)
return fmt.Errorf("failed to submit insert job: %v, %v", jobJSON, err)
}

//store job reference in case internal server error, you may recommit job with the same id to avoid duplication
it.jobReference = job.JobReference
insertWaitTimeMs := it.manager.Config().GetInt(InsertWaitTimeoutInMsKey, defaultInsertWaitTime)
if insertWaitTimeMs <= 0 {
return nil
}
_, err = waitForJobCompletion(it.service, it.context, it.projectID, job.JobReference.JobId, insertWaitTimeMs)
if _, err = waitForJobCompletion(it.service, it.context, it.projectID, job.JobReference.JobId, insertWaitTimeMs); err == nil {
//no error reset job referenc
it.jobReference = nil
}
return err
}

Expand All @@ -237,15 +244,15 @@ func (it *InsertTask) getRowCount() (int, error) {
func (it *InsertTask) streamRows(rows []*bigquery.TableDataInsertAllRequestRows) (*bigquery.TableDataInsertAllResponse, error) {
var response *bigquery.TableDataInsertAllResponse
var err error
for i := 0; i < it.maxRetries; i++ {
for i := 0; i < it.attempts; i++ {
insertRequest := &bigquery.TableDataInsertAllRequest{}
insertRequest.Rows = rows
requestCall := it.service.Tabledata.InsertAll(it.projectID, it.datasetID, it.tableDescriptor.Table, insertRequest)

if response, err = requestCall.Context(it.context).Do(); isInternalServerError(err) {
log.Printf("retrying %v", err)
log.Printf("no insertIdColumn - duplicates expected")
if i+i >= it.maxRetries {
if i+i >= it.attempts {
continue
}
time.Sleep(time.Duration(1+i) * time.Second)
Expand Down Expand Up @@ -365,9 +372,9 @@ func (it *InsertTask) waitForData(estimatedRowCount int) error {
func (it *InsertTask) InsertAll(data interface{}) (int, error) {
var count int
var err error
for i := 0; i < it.maxRetries; i++ {
for i := 0; i < it.attempts; i++ {
if count, err = it.insertAll(data); isInternalServerError(err) {
if i+i >= it.maxRetries {
if i+i >= it.attempts {
continue
}
time.Sleep(time.Duration(i+1) * time.Second)
Expand Down Expand Up @@ -409,14 +416,14 @@ func NewInsertTask(manager dsc.Manager, table *dsc.TableDescriptor, waitForCompl
if _, has := columns[insertIdColumn]; !has {
insertIdColumn = ""
}
maxRetries := config.GetInt(InsertMaxRetires, defaultInsertMaxRetries)
if maxRetries == 0 {
maxRetries = 1
attempts := config.GetInt(InsertMaxRetires, defaultInsertMaxRetries)
if attempts == 0 {
attempts = 1
}
return &InsertTask{
tableDescriptor: table,
service: service,
maxRetries: maxRetries,
attempts: attempts,
context: ctx,
manager: manager,
insertIdColumn: insertIdColumn,
Expand Down

0 comments on commit 38c39cb

Please sign in to comment.