streamBytes
function to get the specified content based on the offset, length, and the connected array of nodes. In the process of obtaining a file, the content of the entire file is obtained by calling the streamBytes
function. The through stream generated by the pull
function returned after the streamBytes
function call is completed is the stream we want to read, and it is finally passed to the through stream generated by the pull
function returned in IPFS core/components/get-pull-stream.js
. This stream is converted into a complete buffer by the handler of the asyncMap
stream in the pull-stream class library in the get.js
file in the same directory, which is used by the final application. The code of this program is as follows:
pull( self.getPullStream(ipfsPath, options), pull.asyncMap((file, cb) => { if (file.content) { pull( file.content, pull.collect((err, buffers) => { if (err) { return cb(err) } file.content = Buffer.concat(buffers) cb(null, file) }) ) } else { cb(null, file) } }), pull.collect(callback) )
The content of the previous article, we recall that this is over, let's take a closer look at streamBytes
function and related deep traversal are implemented.
streamBytes
function uses the pull-traverse class library to provide depth-first, breadth-first, leaf-first, etc. each of its algorithms returns a through
stream of a pull class library, which is called by the stream behind it. The depth-first algorithm is used here, and the returned stream is called by the map
stream of the pull class library to get each element.
- How does Bitcoin fight against negative interest rates?
- People's Network: Blockchain will usher in five major advantages, such as industry upgrades and industrial chain reshaping.
- Technology Viewpoint | Want to develop dApp with Wasm? You have to read the introductory tutorial (1)
- The agenda is big exposure! World Blockchain Conference · Wuzhen Shangtai Jiabinda 130
- Wu Zhenqun, CEO of Wuzhen·Zhi block network: Formulate an equality mechanism to improve the high availability of blockchain and make everyone mine at home
- QKL123 market analysis | EOS network continues to congest, the fundamentals further deteriorated (1112)
The relevant code of the depth first algorithm is as follows:
var once = exports.once = function (value) { return function (abort, cb) { if(abort) return cb(abort) if(value != null) { var _value = value; value = null cb(null, _value) } else cb(true) } }
Var depthFirst = exports.depthFirst = function (start, createStream) { var reads = [], ended
Reads.unshift(once(start))
Return function next (end, cb) { if(!reads.length) return cb(true) if(ended) return cb(ended)
Reads[0](end, function (end, data) { if(end) { if(end !== true) { ended = end reads.shift()
While(reads.length) reads.shift()(end, function () {})
Return cb(end) }
Reads.shift() return next(null, cb) }
Reads.unshift(createStream(data)) cb(end, data) }) } } streamBytes
function is defined in the file.js
file, let's look at its contents:
function streamBytes (dag, node, fileSize, offset, length) { if (offset === fileSize || length === 0) { return once(Buffer.alloc(0)) }
Const end = offset + length
Return pull( traverse.depthFirst({ node, start: 0, end: fileSize }, getChildren(dag, offset, end)), map(extractData(offset, end)), filter(Boolean) ) } According to the depth-first algorithm code, we can first wrap the first parameter into the onto stream of a pull class library, where we wrap our root DAG node into an once
stream and then act as the first element of the inner array. Finally, return the through
stream of the pull class library.
We call a
function next (end, cb) {}
that returns afunction next (end, cb) {}
likefunction next (end, cb) {}
called thethrough
stream of the pull class library. This function is called a read function. Because it is called by the following stream, used to read data from the stream, when the data is read, this function passes the data to the subsequent stream through the callback function specified in the parameter, which is passed to the stream that calls itself.
When the through
stream returned by the map
function of the pull class library calls the read function returned by the depth traversal function, the read function executes as follows:
- If there is no data in the internal array to read, then the read function of the
through
stream returned by themap
function is called and returned.if(!reads.length) return cb(true)
- If the
ended
variable is true, then the read function of thethrough
stream returned by themap
function is called with this variable as the argument and returned.if(ended) return cb(ended)
- Finally, the first element of the inner array (the read function of the
through
stream of type pull library) is called to read the data. After reading the data, call the custom internal function to process the data, in this internal function is handled as follows:- If the current read is complete, ie
end
is true, the following logic is executed. If theend
is not strictly true (appears when the variable is a stringtrue
), then: set the variableended
to the value ofend
; delete the first element in the array; if the length of the array is not 0, then continue to delete the first element ( The type is a function), and the deleted element is called; when the array length is empty, the callback function is called for processing. Otherwise,end
strictly true, and the first element is removed from the array, because this means that the current element of the array has been processed, so the outer function needs to be called to continue reading data from the array.
- If the current read is complete, ie
If(end) { if(end !== true) { ended = end reads.shift()
while(reads.length) reads.shift()(end, function () {})
Return cb(end) }
Reads.shift() return next(null, cb) }
createStream
function to process the read data. This createStream
function is the internal function we return to the second parameter getChildren
function of the deep traversal algorithm. The internal function returned by the getChildren
function will eventually return a through stream generated by the pull
function. In this stream, the flatten
stream of the pull-stream class library will eventually convert each node and its internal nodes into an array form, such as
[1, 2, 3], [4, 5, 6], [7, 8]
This form is transformed into the following form
[1, 2, 3, 4, 5, 6, 7, 8]
Here [1, 2, 3] can be considered as the first Link fragment, which has three DAG nodes containing the final data; [4, 5, 6] is the second Link fragment, and there are three below it. The DAG node of the data; [7, 8] is the third Link fragment, which has only two DAG nodes containing the final data.
We can see that by deep traversing the priority algorithm and the internal function stream returned by the handler getChildren
, we will get each fragment and its saved sub-fragments separately, and arrange them in the correct order to form an array. The complete data to the DAG node. The internal function returned by the getChildren
function is handled as follows:
- If the current node object is a buffer object, that is, the current node is a leaf node, then an empty stream is returned directly because there is no way to traverse again.
if (Buffer.isBuffer(node)) { return empty() }
- Call the static method to convert the current node object into a file object.
let file
Try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }
const nodeHasData = Boolean(file.data && file.data.length) if (nodeHasData && node.links.length) { streamPosition += file.data.length }
const filteredLinks = node.links .map((link, index) => { const child = { link: link, start: streamPosition, end: streamPosition + file.blockSizes[index], size: file.blockSizes[index] }
streamPosition = child.end return child
}) .filter((child) => { return (offset >= child.start && offset < child.end) || // child has offset byte (end > child.start && end <= child.end) || // child has end byte (offset < child.start && end > child.end) // child is between offset and end bytes })
if (filteredLinks.length) { streamPosition = filteredLinks[0].start }
pull
functions.
return pull( once(filteredLinks), paramap((children, cb) => { dag.getMany(children.map(child => child.link.cid), (err, results) => { if (err) { return cb(err) }
cb(null, results.map((result, index) => { const child = children[index]
Return { start: child.start, end: child.end, node: result, size: child.size } })) })
}), flatten() )
In this stream, the stream returned by the paramap
function will call the one-time stream returned by the once
function, and the one-time stream returned by the once
function will pass the Link information array to the former. The former handler handles each fragment in the Link info array (there is only one Link info array, that is, only the children
array, not multiple children
arrays, but the children
array contains all the Link information). Calling the getMany of the IPLD object in the handler of the stream returned by the paramap
function to get the data of each Link node, and sorting the returned data, and then calling the next stream with the sorted array as the parameter—the flatten
stream The — callback function specified in the read function, passing the final array to it. Finally, after the array is flattened by the flatten
stream, it is passed to the stream in the external pull
function, that is, the function specified in the read
function of the map
stream of the pull class library seen earlier, which is called again in this function. We provide an extractData
function returned by the extractData
function to handle each node object. The internal function returned by the extractData
function is relatively simple, mainly to process each fragmented data obtained, and then return the corresponding array, its code is as follows, the reader can analyze it yourself, not to elaborate here.
function getData ({ node, start, end }) { let block
If (Buffer.isBuffer(node)) { block = node } else { try { const file = UnixFS.unmarshal(node.data)
If (!file.data) { if (file.blockSizes.length) { return }
Return Buffer.alloc(0) }
Block = file.data } catch (err) { throw new Error(`Failed to unmarshal node – ${err.message}`) } }
If (block && block.length) { if (streamPosition === -1) { streamPosition = start }
Const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd)
streamPosition += block.length
Return output }
Return Buffer.alloc(0) }