Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion build/package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ RUN CGO_ENABLED=${CGO_ENABLED} go build -trimpath \
-ldflags "-w -s \
-X github.com/ozontech/seq-db/buildinfo.Version=${VERSION} \
-X github.com/ozontech/seq-db/buildinfo.BuildTime=${BUILD_TIME}" \
-o seq-db ./cmd/seq-db
-o ./ ./cmd/...

# Deploy
FROM $APP_IMAGE

WORKDIR /seq-db

COPY --from=build /seq-db/seq-db /seq-db/seq-db
COPY --from=build /seq-db/unpacker /seq-db/unpacker
COPY --from=build /seq-db/inserter /seq-db/inserter

ENTRYPOINT [ "./seq-db" ]
124 changes: 124 additions & 0 deletions cmd/inserter/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"bufio"
"bytes"
"flag"
"fmt"
"log"
"net/http"
"os"
"time"
)

func main() {
address := flag.String("address", "http://localhost:9002", "Elasticsearch address")
file := flag.String("file", "", "Path to JSONLines file")
batchSize := flag.Int("batch-size", 1000, "Number of documents per batch")
interval := flag.Duration("interval", 0, "Time between inserting batches")

flag.Parse()

if *file == "" {
log.Fatal("file flag is required")
}

f, err := os.Open(*file)
if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
defer f.Close()

scanner := bufio.NewScanner(f)

const maxCapacity = 1024 * 1024 // 1MB buffer
buf := make([]byte, maxCapacity)
scanner.Buffer(buf, maxCapacity) // Set custom buffer and max size

var batch []string
lineNum := 0

Check failure on line 39 in cmd/inserter/main.go

View workflow job for this annotation

GitHub Actions / lint

var lineNum is unused (unused)

for scanner.Scan() {
lineNum++
line := scanner.Text()
if line == "" {
continue
}

// Validate JSON
//var doc map[string]interface{}

Check failure on line 49 in cmd/inserter/main.go

View workflow job for this annotation

GitHub Actions / lint

commentFormatting: put a space between `//` and comment text (gocritic)
//if err := json.Unmarshal([]byte(line), &doc); err != nil {
// log.Printf("Warning: skipping invalid JSON at line %d: %v", lineNum, err)
// continue
//}

// Create index action
action := fmt.Sprintf(`{"index":{}}`)

batch = append(batch, action, line)

if len(batch)/2 >= *batchSize {
if err := sendBatch(*address, batch); err != nil {
log.Printf("Failed to send batch: %v", err)
} else {
log.Printf("Sent %d documents", len(batch)/2)
}
batch = nil

if *interval > 0 {
time.Sleep(*interval)
}
}
}

if err := scanner.Err(); err != nil {
log.Printf("Error reading file: %v", err)
}

// Send remaining batch
if len(batch) > 0 {
if err := sendBatch(*address, batch); err != nil {
log.Printf("Failed to send final batch: %v", err)
} else {
log.Printf("Sent final %d documents", len(batch)/2)
}
}
}

func sendBatch(address string, batch []string) error {
sendbatchagain:

url := address

body := bytes.NewBufferString("")
for _, line := range batch {
body.WriteString(line)
body.WriteString("\n")
}

req, err := http.NewRequest("POST", url, body)
if err != nil {
return err
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
if resp.StatusCode == 429 {
log.Printf("429 received. Retrying...")
time.Sleep(5 * time.Second)
goto sendbatchagain
}

var buf bytes.Buffer
buf.ReadFrom(resp.Body)

Check failure on line 119 in cmd/inserter/main.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `buf.ReadFrom` is not checked (errcheck)
return fmt.Errorf("bulk request failed with status %d: %s", resp.StatusCode, buf.String())
}

return nil
}
Loading