Proficient in IPFS: IPFS Get the content below

In the previous article "IPFS Get Top Content" , we mentioned calling the streamBytes function to get the specified content based on the offset, length, and the connected array of nodes. In the process of obtaining a file, the content of the entire file is obtained by calling the streamBytes function. The through stream generated by the pull function returned after the streamBytes function call is completed is the stream we want to read, and it is finally passed to the through stream generated by the pull function returned in IPFS core/components/get-pull-stream.js . This stream is converted into a complete buffer by the handler of the asyncMap stream in the pull-stream class library in the get.js file in the same directory, which is used by the final application. The code of this program is as follows:

 pull(  self.getPullStream(ipfsPath, options),  pull.asyncMap((file, cb) => {    if (file.content) {      pull(        file.content,        pull.collect((err, buffers) => {          if (err) { return cb(err) }          file.content = Buffer.concat(buffers)          cb(null, file)        })      )    } else {      cb(null, file)    }  }),  pull.collect(callback) ) 

The content of the previous article, we recall that this is over, let's take a closer look at streamBytes function and related deep traversal are implemented.

streamBytes function uses the pull-traverse class library to provide depth-first, breadth-first, leaf-first, etc. each of its algorithms returns a through stream of a pull class library, which is called by the stream behind it. The depth-first algorithm is used here, and the returned stream is called by the map stream of the pull class library to get each element.

The relevant code of the depth first algorithm is as follows:

 var once = exports.once = function (value) {  return function (abort, cb) {    if(abort) return cb(abort)    if(value != null) {      var _value = value; value = null      cb(null, _value)    } else      cb(true)  } } 

Var depthFirst = exports.depthFirst = function (start, createStream) { var reads = [], ended

Reads.unshift(once(start))

Return function next (end, cb) { if(!reads.length) return cb(true) if(ended) return cb(ended)

Reads[0](end, function (end, data) { if(end) { if(end !== true) { ended = end reads.shift()

While(reads.length) reads.shift()(end, function () {})

Return cb(end) }

Reads.shift() return next(null, cb) }

Reads.unshift(createStream(data)) cb(end, data) }) } } streamBytes function is defined in the file.js file, let's look at its contents:

 function streamBytes (dag, node, fileSize, offset, length) {  if (offset === fileSize || length === 0) {    return once(Buffer.alloc(0))  } 

Const end = offset + length

Return pull( traverse.depthFirst({ node, start: 0, end: fileSize }, getChildren(dag, offset, end)), map(extractData(offset, end)), filter(Boolean) ) } According to the depth-first algorithm code, we can first wrap the first parameter into the onto stream of a pull class library, where we wrap our root DAG node into an once stream and then act as the first element of the inner array. Finally, return the through stream of the pull class library.

We call a function next (end, cb) {} that returns a function next (end, cb) {} like function next (end, cb) {} called the through stream of the pull class library. This function is called a read function. Because it is called by the following stream, used to read data from the stream, when the data is read, this function passes the data to the subsequent stream through the callback function specified in the parameter, which is passed to the stream that calls itself.

When the through stream returned by the map function of the pull class library calls the read function returned by the depth traversal function, the read function executes as follows:

  1. If there is no data in the internal array to read, then the read function of the through stream returned by the map function is called and returned.
     if(!reads.length)  return cb(true) 
  2. If the ended variable is true, then the read function of the through stream returned by the map function is called with this variable as the argument and returned.
     if(ended)  return cb(ended) 
  3. Finally, the first element of the inner array (the read function of the through stream of type pull library) is called to read the data. After reading the data, call the custom internal function to process the data, in this internal function is handled as follows:
    • If the current read is complete, ie end is true, the following logic is executed. If the end is not strictly true (appears when the variable is a string true ), then: set the variable ended to the value of end ; delete the first element in the array; if the length of the array is not 0, then continue to delete the first element ( The type is a function), and the deleted element is called; when the array length is empty, the callback function is called for processing. Otherwise, end strictly true, and the first element is removed from the array, because this means that the current element of the array has been processed, so the outer function needs to be called to continue reading data from the array.

If(end) { if(end !== true) { ended = end reads.shift()

   while(reads.length)    reads.shift()(end, function () {}) 

Return cb(end) }

Reads.shift() return next(null, cb) }

  • Call the createStream function to process the read data. This createStream function is the internal function we return to the second parameter getChildren function of the deep traversal algorithm. The internal function returned by the getChildren function will eventually return a through stream generated by the pull function. In this stream, the flatten stream of the pull-stream class library will eventually convert each node and its internal nodes into an array form, such as
     [1, 2, 3], [4, 5, 6], [7, 8] 

    This form is transformed into the following form

     [1, 2, 3, 4, 5, 6, 7, 8] 

    Here [1, 2, 3] can be considered as the first Link fragment, which has three DAG nodes containing the final data; [4, 5, 6] is the second Link fragment, and there are three below it. The DAG node of the data; [7, 8] is the third Link fragment, which has only two DAG nodes containing the final data.

  • We can see that by deep traversing the priority algorithm and the internal function stream returned by the handler getChildren , we will get each fragment and its saved sub-fragments separately, and arrange them in the correct order to form an array. The complete data to the DAG node. The internal function returned by the getChildren function is handled as follows:

    1. If the current node object is a buffer object, that is, the current node is a leaf node, then an empty stream is returned directly because there is no way to traverse again.
       if (Buffer.isBuffer(node)) {  return empty() } 
    2. Call the static method to convert the current node object into a file object.
       let file 

    Try { file = UnixFS.unmarshal(node.data) } catch (err) { return error(err) }

  • Determine the starting position of the stream.
     const nodeHasData = Boolean(file.data && file.data.length) if (nodeHasData && node.links.length) {  streamPosition += file.data.length } 
  • Processes the Link information contained in the current node, filters out the Link information that is not in the specified range, and returns an array of Link information in order.
     const filteredLinks = node.links  .map((link, index) => {    const child = {      link: link,      start: streamPosition,      end: streamPosition + file.blockSizes[index],      size: file.blockSizes[index]    } 
     streamPosition = child.end return child 
       })  .filter((child) => {    return (offset >= child.start && offset < child.end) || // child has offset byte      (end > child.start && end <= child.end) || // child has end byte      (offset < child.start && end > child.end) // child is between offset and end bytes  }) 
  • If the last returned Link information array exists, set the start position of the stream to the beginning of the first Link information.
     if (filteredLinks.length) {  streamPosition = filteredLinks[0].start } 
  • Returns a stream of pull functions.
     return pull(  once(filteredLinks),  paramap((children, cb) => {    dag.getMany(children.map(child => child.link.cid), (err, results) => {      if (err) {        return cb(err)      } 
       cb(null, results.map((result, index) =&gt; {    const child = children[index] 
  • Return { start: child.start, end: child.end, node: result, size: child.size } })) })

       }),  flatten() ) 

    In this stream, the stream returned by the paramap function will call the one-time stream returned by the once function, and the one-time stream returned by the once function will pass the Link information array to the former. The former handler handles each fragment in the Link info array (there is only one Link info array, that is, only the children array, not multiple children arrays, but the children array contains all the Link information). Calling the getMany of the IPLD object in the handler of the stream returned by the paramap function to get the data of each Link node, and sorting the returned data, and then calling the next stream with the sorted array as the parameter—the flatten stream The — callback function specified in the read function, passing the final array to it. Finally, after the array is flattened by the flatten stream, it is passed to the stream in the external pull function, that is, the function specified in the read function of the map stream of the pull class library seen earlier, which is called again in this function. We provide an extractData function returned by the extractData function to handle each node object. The internal function returned by the extractData function is relatively simple, mainly to process each fragmented data obtained, and then return the corresponding array, its code is as follows, the reader can analyze it yourself, not to elaborate here.

     function getData ({ node, start, end }) {    let block 

    If (Buffer.isBuffer(node)) { block = node } else { try { const file = UnixFS.unmarshal(node.data)

    If (!file.data) { if (file.blockSizes.length) { return }

    Return Buffer.alloc(0) }

    Block = file.data } catch (err) { throw new Error(`Failed to unmarshal node – ${err.message}`) } }

    If (block && block.length) { if (streamPosition === -1) { streamPosition = start }

    Const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd)

    streamPosition += block.length

    Return output }

    Return Buffer.alloc(0) }