A bulk indexer creates goroutines, and channels for connecting and sending data to elasticsearch in bulk, using buffers.


type BulkIndexer struct {
	conn *Conn

	// We are creating a variable defining the func responsible for sending
	// to allow a mock sendor for test purposes
	Sender func(*bytes.Buffer) error

	// The refresh parameter can be set to true in order to refresh the
	// relevant primary and replica shards immediately after the bulk
	// operation has occurred
	Refresh bool

	// If we encounter an error in sending, we are going to retry for this long
	// before returning an error
	// if 0 it will not retry
	RetryForSeconds int

	// channel for getting errors
	ErrorChannel chan *ErrorBuffer

	// channel for sending to background indexer
	bulkChannel chan []byte

	// numErrors is a running total of errors seen
	numErrors uint64

	// shutdown channel
	shutdownChan chan chan struct{}
	// channel to shutdown timer
	timerDoneChan chan struct{}

	// Channel to send a complete byte.Buffer to the http sendor
	sendBuf chan *bytes.Buffer
	// byte buffer for docs that have been converted to bytes, but not yet sent
	buf *bytes.Buffer
	// Buffer for Max number of time before forcing flush
	BufferDelayMax time.Duration
	// Max buffer size in bytes before flushing to elasticsearch
	BulkMaxBuffer int // 1048576
	// Max number of Docs to hold in buffer before forcing flush
	BulkMaxDocs int // 100

	// Number of documents we have send through so far on this session
	docCt int
	// Max number of http conns in flight at one time
	maxConns int
	// If we are indexing enough docs per bufferdelaymax, we won't need to do time
	// based eviction, else we do.
	needsTimeBasedFlush bool
	// Lock for document writes/operations
	mu sync.Mutex
	// Wait Group for the http sends
	sendWg *sync.WaitGroup