DeliverNodeData injects a node state data retrieval response into the queue. The method returns the number of node state entries originally requested, and the number of them actually accepted from the delivery.

DeliverNodeData is referenced in 1 repository


func (q *queue) DeliverNodeData(id string, data [][]byte, callback func(error, int)) (int, error) {
	defer q.lock.Unlock()

	// Short circuit if the data was never requested
	request := q.statePendPool[id]
	if request == nil {
		return 0, errNoFetchesPending
	delete(q.statePendPool, id)

	// If no data was retrieved, mark their hashes as unavailable for the origin peer
	if len(data) == 0 {
		for hash, _ := range request.Hashes {
	// Iterate over the downloaded data and verify each of them
	accepted, errs := 0, make([]error, 0)
	process := []trie.SyncResult{}
	for _, blob := range data {
		// Skip any state trie entries that were not requested
		hash := common.BytesToHash(crypto.Keccak256(blob))
		if _, ok := request.Hashes[hash]; !ok {
			errs = append(errs, fmt.Errorf("non-requested state data %x", hash))
		// Inject the next state trie item into the processing queue
		process = append(process, trie.SyncResult{Hash: hash, Data: blob})

		delete(request.Hashes, hash)
		delete(q.stateTaskPool, hash)
	// Start the asynchronous node state data injection
	atomic.AddInt32(&q.stateProcessors, 1)
	go func() {
		defer atomic.AddInt32(&q.stateProcessors, -1)
		q.deliverNodeData(process, callback)
	// Return all failed or missing fetches to the queue
	for hash, index := range request.Hashes {
		q.stateTaskQueue.Push(hash, float32(index))
	// If none of the data items were good, it's a stale delivery
	switch {
	case len(errs) == 0:
		return accepted, nil
	case len(errs) == len(request.Hashes):
		return accepted, errStaleDelivery
		return accepted, fmt.Errorf("multiple failures: %v", errs)