diff --git a/lib/corebulk.go b/lib/corebulk.go index 9c33e9c0..6f1e9763 100644 --- a/lib/corebulk.go +++ b/lib/corebulk.go @@ -137,6 +137,10 @@ func (b *BulkIndexer) Start() { if b.Sender == nil { b.Sender = b.Send } + // Resize bulk channel buffer if max docs greater than default + if b.BulkMaxDocs > BulkMaxDocs { + b.bulkChannel = make(chan []byte, b.BulkMaxDocs) + } // Backwards compatibility b.startHttpSender() b.startDocChannel() @@ -278,9 +282,9 @@ func (b *BulkIndexer) shutdown() { // The index bulk API adds or updates a typed JSON document to a specific index, making it searchable. // it operates by buffering requests, and ocassionally flushing to elasticsearch // http://www.elasticsearch.org/guide/reference/api/bulk.html -func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error { +func (b *BulkIndexer) Index(index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) error { //{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } - by, err := WriteBulkBytes("index", index, _type, id, parent, ttl, date, data) + by, err := WriteBulkBytes("index", index, _type, id, parent, routing, ttl, date, data) if err != nil { return err } @@ -288,9 +292,9 @@ func (b *BulkIndexer) Index(index string, _type string, id, parent, ttl string, return nil } -func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) error { +func (b *BulkIndexer) Update(index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) error { //{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } } - by, err := WriteBulkBytes("update", index, _type, id, parent, ttl, date, data) + by, err := WriteBulkBytes("update", index, _type, id, parent, routing, ttl, date, data) if err != nil { return err } @@ -298,20 +302,28 @@ func (b *BulkIndexer) Update(index string, _type string, id, parent, ttl string, return nil } -func (b *BulkIndexer) Delete(index, _type, id string) { - queryLine := fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q}}\n", index, _type, id) - b.bulkChannel <- []byte(queryLine) +func (b *BulkIndexer) Delete(index, _type, _parent, _routing, id string) { + queryLine := bytes.Buffer{} + queryLine.WriteString(fmt.Sprintf("{\"delete\":{\"_index\":%q,\"_type\":%q,\"_id\":%q", index, _type, id)) + if len(_parent) > 0 { + queryLine.WriteString(fmt.Sprintf(",\"_parent\":%q", _parent)) + } + if len(_routing) > 0 { + queryLine.WriteString(fmt.Sprintf(",\"_routing\":%q", _routing)) + } + queryLine.WriteString("}}\n") + b.bulkChannel <- queryLine.Bytes() return } -func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, parent, ttl string, date *time.Time, script string) error { +func (b *BulkIndexer) UpdateWithWithScript(index string, _type string, id, parent, routing, ttl string, date *time.Time, script string) error { var data map[string]interface{} = make(map[string]interface{}) data["script"] = script - return b.Update(index, _type, id, parent, ttl, date, data) + return b.Update(index, _type, id, parent, routing, ttl, date, data) } -func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error { +func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, parent, routing, ttl string, date *time.Time, partialDoc interface{}, upsert bool) error { var data map[string]interface{} = make(map[string]interface{}) @@ -319,7 +331,7 @@ func (b *BulkIndexer) UpdateWithPartialDoc(index string, _type string, id, paren if upsert { data["doc_as_upsert"] = true } - return b.Update(index, _type, id, parent, ttl, date, data) + return b.Update(index, _type, id, parent, routing, ttl, date, data) } // This does the actual send of a buffer, which has already been formatted @@ -344,7 +356,12 @@ func (b *BulkIndexer) Send(buf *bytes.Buffer) error { if jsonErr == nil { if response.Errors { atomic.AddUint64(&b.numErrors, uint64(len(response.Items))) - return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]", len(response.Items)) + itemJson, err := json.Marshal(response.Items) + if err == nil { + return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]. Response Items: %s", len(response.Items), itemJson) + } else { + return fmt.Errorf("Bulk Insertion Error. Failed item count [%d]. Error marshalling response items: %s", len(response.Items), err) + } } } return nil @@ -352,7 +369,7 @@ func (b *BulkIndexer) Send(buf *bytes.Buffer) error { // Given a set of arguments for index, type, id, data create a set of bytes that is formatted for bulkd index // http://www.elasticsearch.org/guide/reference/api/bulk.html -func WriteBulkBytes(op string, index string, _type string, id, parent, ttl string, date *time.Time, data interface{}) ([]byte, error) { +func WriteBulkBytes(op string, index string, _type string, id, parent, routing, ttl string, date *time.Time, data interface{}) ([]byte, error) { // only index and update are currently supported if op != "index" && op != "update" { return nil, errors.New(fmt.Sprintf("Operation '%s' is not yet supported", op)) @@ -377,6 +394,12 @@ func WriteBulkBytes(op string, index string, _type string, id, parent, ttl strin buf.WriteString(`"`) } + if len(routing) > 0 { + buf.WriteString(`,"_routing":"`) + buf.WriteString(routing) + buf.WriteString(`"`) + } + if op == "update" { buf.WriteString(`,"_retry_on_conflict":3`) } diff --git a/lib/corebulk_test.go b/lib/corebulk_test.go index e352ae33..44ab2c4f 100644 --- a/lib/corebulk_test.go +++ b/lib/corebulk_test.go @@ -100,7 +100,7 @@ func TestBulkIndexerBasic(t *testing.T) { "date": "yesterday", } - err := indexer.Index(testIndex, "user", "1", "", "", &date, data) + err := indexer.Index(testIndex, "user", "1", "", "", "", &date, data) waitFor(func() bool { return buffers.Length() > 0 }, 5) @@ -112,7 +112,7 @@ func TestBulkIndexerBasic(t *testing.T) { expectedBytes := 129 assert.T(t, totalBytesSent == expectedBytes, fmt.Sprintf("Should have sent %v bytes but was %v", expectedBytes, totalBytesSent)) - err = indexer.Index(testIndex, "user", "2", "", "", nil, data) + err = indexer.Index(testIndex, "user", "2", "", "", "", nil, data) waitFor(func() bool { return buffers.Length() > 1 }, 5) @@ -148,7 +148,7 @@ func TestRefreshParam(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", "", &date, data) <-time.After(time.Millisecond * 200) // indexer.Flush() @@ -174,7 +174,7 @@ func TestWithoutRefreshParam(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", "", &date, data) <-time.After(time.Millisecond * 200) // indexer.Flush() @@ -215,7 +215,7 @@ func XXXTestBulkUpdate(t *testing.T) { data := map[string]interface{}{ "script": "ctx._source.count += 2", } - err = indexer.Update("users", "user", "5", "", "", &date, data) + err = indexer.Update("users", "user", "5", "", "", "", &date, data) // So here's the deal. Flushing does seem to work, you just have to give the // channel a moment to recieve the message ... // <- time.After(time.Millisecond * 20) @@ -261,9 +261,9 @@ func TestBulkSmallBatch(t *testing.T) { indexer.Start() <-time.After(time.Millisecond * 20) - indexer.Index("users", "user", "2", "", "", &date, data) - indexer.Index("users", "user", "3", "", "", &date, data) - indexer.Index("users", "user", "4", "", "", &date, data) + indexer.Index("users", "user", "2", "", "", "", &date, data) + indexer.Index("users", "user", "3", "", "", "", &date, data) + indexer.Index("users", "user", "4", "", "", "", &date, data) <-time.After(time.Millisecond * 200) // indexer.Flush() indexer.Stop() @@ -271,6 +271,47 @@ func TestBulkSmallBatch(t *testing.T) { } +func TestBulkInsertWithMeta(t *testing.T) { + InitTests(true) + var lock sync.Mutex + c := NewTestConn() + indexer := c.NewBulkIndexer(1) + sentBytes := []byte{} + + indexer.Sender = func(buf *bytes.Buffer) error { + lock.Lock() + sentBytes = append(sentBytes, buf.Bytes()...) + lock.Unlock() + return nil + } + + indexer.Start() + + data := map[string]interface{}{ + "name": "smurfs", + "age": 22, + "date": "yesterday", + } + + indexer.Index(testIndex, "user", "1", "p", "", "", nil, data) + indexer.Index(testIndex, "user", "2", "", "r", "", nil, data) + + indexer.Flush() + indexer.Stop() + + lock.Lock() + sent := string(sentBytes) + lock.Unlock() + + expected := `{"index":{"_index":"github","_type":"user","_id":"1","_parent":"p"}} +{"age":22,"date":"yesterday","name":"smurfs"} +{"index":{"_index":"github","_type":"user","_id":"2","_routing":"r"}} +{"age":22,"date":"yesterday","name":"smurfs"} +` + asExpected := sent == expected + assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent)) +} + func TestBulkDelete(t *testing.T) { InitTests(true) var lock sync.Mutex @@ -287,7 +328,7 @@ func TestBulkDelete(t *testing.T) { indexer.Start() - indexer.Delete("fake", "fake_type", "1") + indexer.Delete("fake", "fake_type", "", "", "1") indexer.Flush() indexer.Stop() @@ -302,6 +343,39 @@ func TestBulkDelete(t *testing.T) { assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent)) } +func TestBulkDeleteWithMeta(t *testing.T) { + InitTests(true) + var lock sync.Mutex + c := NewTestConn() + indexer := c.NewBulkIndexer(1) + sentBytes := []byte{} + + indexer.Sender = func(buf *bytes.Buffer) error { + lock.Lock() + sentBytes = append(sentBytes, buf.Bytes()...) + lock.Unlock() + return nil + } + + indexer.Start() + + indexer.Delete("fake", "fake_type", "p", "", "1") + indexer.Delete("fake", "fake_type", "", "r", "1") + + indexer.Flush() + indexer.Stop() + + lock.Lock() + sent := string(sentBytes) + lock.Unlock() + + expected := `{"delete":{"_index":"fake","_type":"fake_type","_id":"1","_parent":"p"}} +{"delete":{"_index":"fake","_type":"fake_type","_id":"1","_routing":"r"}} +` + asExpected := sent == expected + assert.T(t, asExpected, fmt.Sprintf("Should have sent '%s' but actually sent '%s'", expected, sent)) +} + func XXXTestBulkErrors(t *testing.T) { // lets set a bad port, and hope we get a conn refused error? c := NewTestConn() @@ -316,7 +390,7 @@ func XXXTestBulkErrors(t *testing.T) { for i := 0; i < 20; i++ { date := time.Unix(1257894000, 0) data := map[string]interface{}{"name": "smurfs", "age": 22, "date": date} - indexer.Index("users", "user", strconv.Itoa(i), "", "", &date, data) + indexer.Index("users", "user", strconv.Itoa(i), "", "", "", &date, data) } }() var errBuf *ErrorBuffer @@ -356,7 +430,7 @@ func BenchmarkSend(b *testing.B) { about := make([]byte, 1000) rand.Read(about) data := map[string]interface{}{"name": "smurfs", "age": 22, "date": time.Unix(1257894000, 0), "about": about} - indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, data) + indexer.Index("users", "user", strconv.Itoa(i), "", "", "", nil, data) } log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes) if indexer.NumErrors() != 0 { @@ -390,7 +464,7 @@ func BenchmarkSendBytes(b *testing.B) { return indexer.Send(buf) } for i := 0; i < b.N; i++ { - indexer.Index("users", "user", strconv.Itoa(i), "", "", nil, body) + indexer.Index("users", "user", strconv.Itoa(i), "", "", "", nil, body) } log.Printf("Sent %d messages in %d sets totaling %d bytes \n", b.N, sets, totalBytes) if indexer.NumErrors() != 0 { diff --git a/lib/coretest_test.go b/lib/coretest_test.go index db859084..e5a68d2d 100644 --- a/lib/coretest_test.go +++ b/lib/coretest_test.go @@ -179,7 +179,7 @@ func LoadTestData() { log.Println("HM, already exists? ", ge.Url) } docsm[id] = true - indexer.Index(testIndex, ge.Type, id, "", "", &ge.Created, line) + indexer.Index(testIndex, ge.Type, id, "", "", "", &ge.Created, line) docCt++ } else { log.Println("ERROR? ", string(line))