Proficient in IPFS: IPFS saves the content below

In the previous article, we pointed out that calling the pull function to save the file in the builder/builder.js file, we will study this process in detail in this article.

  1. Set the source stream to file.content .
  2. Call the chunker stream to block the saved content. From the previous article, we know that the default implementation of the chunker stream is chunker/fixed-size.js , which is a pull-through stream. This stream provides two functions, called onData and onEnd , which are called onData time the data arrives, and the latter is called when the data is sent. fixed-size.js sets the size of each block according to the maxChunkSize property specified in the option during initialization. Below, let's take a look at its two methods in its onData and onEnd .

onData function is handled as follows:

If (!emitted) {
This.queue(Buffer.alloc(0))
}

This.queue(null)
The above logic for fixed partitioning in IPFS is actually very simple.

  • Call the paraMap stream (type pull-paramap) to process each chunk. When the previous stream chunks the file, each partition will pull the next stream. This is the function here. Let's see how this function handles each chunk. Its body is a waterfall function. As the name suggests, each function performs its own processing and passes the result to the next function. Let's take a look at some of its handlers.
  • First, let's look at the first function, which is mainly used to create a DAGNode and pass the relevant information to the second function. Its execution logic is as follows:

    • Generate a UnixFS object.
       const file = new UnixFS(options.leafType, buffer) 

      UnixFS is a protocol-buffer-based format for describing files, directories, and symbolic links in IPFS. Currently it supports: raw data, directories, files, raw data, symbolic links, hamt-sharded-directory and so on.

    leafType defaults to a file, defaultOptions specified by default option defaultOptions when the file is initialized.

  • Call the DAGNode.create static method to create a DAGNode node. After success, pass the belief information to the next function.
     DAGNode.create(file.marshal(), [], (err, node) => {  if (err) {    return cb(err)  } 
  • Cb(null, {
    Size: node.size,
    leafSize: file.fileSize(),
    Data: node
    })
    })
    The main content of the UnixFS marshal method is to encode the file content (byte buffer). Here DAGNode refers to the DAGNode function object defined in dag-node/index.js in the ipld-dag-pb library. Its create method is defined in create.js in the same directory. Let's take a look at this method. . Its main content is to check the partition data of the file and the link of other blocks, and then create the DAGNode object after the sequence of the two. The latter's constructor is relatively simple, and only saves the data of the block and the connection with other blocks (representing the relationship with other blocks). Next, let's look at the second function. Its main function is to save the generated DAGNode to the system and pass the saved result to the next function. Its execution logic is as follows:

    • Call the persist method to save the DAG node. This is a very important step. It not only saves the block object in the local repository, but also whether to save the block CID on the node closest to it. It also involves sending the block through the bitswap protocol to those who want it. It's in the node. Its implementation is as follows:
      • Get the CID version number, hash algorithm, encoding method, etc. from the options.
         let cidVersion = options.cidVersion || defaultOptions.cidVersion let hashAlg = options.hashAlg || defaultOptions.hashAlg let codec = options.codec || defaultOptions.codec 

    If (Buffer.isBuffer(node)) {
    cidVersion = 1
    Codec = 'raw'
    }

    If (hashAlg !== 'sha2-256') {
    cidVersion = 1
    }
    By default, the version number is 0, the hash algorithm is SHA256, and the encoding method is dag-pb, which is a JS implementation based on the Protocol specification.

  • If the option specified is not saved and only the hash value is calculated, then the cid function in util.js in the ipld-dag-pb library is called to get the CID of the DAG node and then return directly.
     if (options.onlyHash) {    return cid(node, {      version: cidVersion,      hashAlg: hashAlg    }, (err, cid) => {      callback(err, {        cid,        node      })    }) } 
  • If it is not just a hash, call the put IPLD object to hold the DAG node.
     ipld.put(node, {        version: cidVersion,        hashAlg: hashAlg,        format: codec    }, (error, cid) => {    callback(error, {        cid,        node    }) }) 

    The IPLD object is defined in the ipld library. IPLD plays a very important role in IPFS. It is the abbreviation of InterPlanetary Linked-Data. It represents the ambition and hope of IPFS. The desire to link everything together can be done with Bitcoin, Ethereum, Zcash, git, etc. It holds ipfs-block-service, which in turn holds the ipfs repository object and the bitswap object, which form the core of ipfs.

  • Let's look at the put method and see how it saves the DAG object. Its main body is to call the internal method to get the format of the current DAG object encoding, and then use the cid method matching this format to get the object's CID object, and then call the internal _put to save the data.

     this._getFormat(options.format, (err, format) => {  if (err) return callback(err) 

    Format.util.cid(node, options, (err, cid) => {
    If (err) {
    Return callback(err)
    }

     if (options.onlyHash) {  return callback(null, cid) } 

    This._put(cid, node, callback)

       }) }) 

    Next, let's look at this internal _put method. The main body of this method is a waterfall function. Several internal functions get the corresponding encoding format according to the CID object, and then serialize the DAG node object using the corresponding method of the encoding format. Block Block object, and call the put method of the block service object to save the block.

    The block service object is defined in the ipfs-block-service library. Its put method determines whether to call the repository object to save the block or to call the bitswap to save the block, depending on whether there is a bitswap object (initialization is empty). For our example, it calls bitswap to save the block.

    The put method of the bitswap object not only saves the block in the underlying blockstore, but also sends it to the nodes that need it. Its main body is a waterfall function, in which the first function checks whether the block storage in this area has this block, and the second determines whether to ignore the call or whether to actually save the block according to whether there is a local block.

     waterfall([  (cb) => this.blockstore.has(block.cid, cb),  (has, cb) => {    if (has) {      return nextTick(cb)    } 
     this._putBlock(block, cb) 
       } ], callback) 

    The _putBlock method of the bitswap object calls the put method of the block storage object to save the block object in the local repository, and after successful, triggers an event that receives the block, and at the same time saves the CID in the nearest through the provide method of the network object. In the node, the engine object's receivedBlocks method is then called to send the received block object to all nodes that want the block.

     this.blockstore.put(block, (err) => {  if (err) {    return callback(err)  } 

    this.notifications.hasBlock(block)
    This.network.provide(block.cid, (err) => {
    If (err) {
    This._log.error('Failed to provide: %s', err.message)
    }
    })

    This.engine.receivedBlocks([block.cid])
    Callback()
    })
    There are two important objects in the bitswap object, one is the network object and the other is the engine object.

    The network object's provide method directly calls the libp2p object's content routing method of the same name to handle the block's CID. The libp2p object's content routing saves all the specific routing methods. By default, it is empty, that is, there is no routing method. Instead, we specify libp2p.config.dht.enabled in the configuration file to specify the content routing. DHT routing, so the CID of the final block will be saved in the most appropriate node.

    In the initial method, the network object specifies its own two methods as the processor of the libp2p object's node connection and disconnection event, so as to get the corresponding notification when connecting and disconnecting, and also call the handle method of the libp2p object. Thus, it becomes the processing object of the two definitions of libp2p object /ipfs/bitswap/1.0.0 and /ipfs/bitswap/1.1.0 , so when libp2p receives these two kinds of messages, it will call the corresponding of the network object object. The method is processed.

    The network object processing bitswap is handled by the pull function. The general flow is as follows: Get the message from the connection object, then deserialize it into a message object, then get its node information object through the connection object, and then call the inside of the bitswap object. The method _receiveMessage handles the incoming message, which in turn calls the messageReceived method of the engine object to process the received message.

    The general flow of the messageReceived method of the engine object is as follows:

    1) Call the internal method _findOrCreate to find or create the general ledger object Ledger of the remote peer node. If it is the newly created general ledger object, it should also be placed in the internal mapping set. The key is the Base58 string of the remote peer node.

    2) If the message is a complete message, generate a new list of wanted requests.

    3) Call the internal method _processBlocks to process the block object in the message.

    4) If the desired list in the message is empty, exit the method.

    5) traversing the desired list in the message. If the currently desired entity is canceled, the corresponding item is removed from the corresponding ledger of the corresponding node and saved in the canceled item list; otherwise, the current item is saved in the corresponding node. In the general ledger, it is also saved in the desired list.

    6) Call the internal method _cancelWants to filter out the canceled tasks in the task, that is, delete the tasks that have been canceled in the task.

    7) Call the internal method _addWants to handle all the desired lists of remote peer nodes. The block storage object is called to determine whether the desired item is already in the local repository, and if so, the corresponding task is generated.

    The receivedBlocks method of the engine object checks all connected remote nodes (general ledger objects) when they receive a specific block to see if they want the block, and if so, generates a task to process in the background.

  • Call the pullThrough stream (type pull-through stream) to process each data received. This process is relatively simple, not to elaborate here.
  • Call the reducer stream to normalize all generated chunks. By default, the reducer stream is generated in balanced/index.js by calling the balanced/balanced-reducer.js function in balanced/balanced-reducer.js . Let's take a look at the execution of this function:
    • Generate pull-pair objects and pull-pushable objects.
       const pair = pullPair() const source = pair.source 
  • Const result = pushable()

  • Call the reduceToParents function to establish an internal pull stream. The main body of the function is a stream created by a pull function. Several of its functions are as follows:
    • The first function is the previously created source stream.
    • The second function is a stream defined by the pull-batch class library. This is a pull-through stream. It implements its own writer and ender functions. It saves the data acquired each time in an internal array. It will not be saved to the queue of pull-through streams after a certain program.
    • The third function is the async-map stream of the pull-stream class library, which is a through stream, similar to the map stream, but with better performance. Its normalization function reduce default the reducefile function returned in builder/reduce.js . Its flow is as follows: 1) If the current number of leaf nodes is 1, and its single flag is true, and there are configurations in the option to group the individual leaves to themselves, then the callback object is called directly; otherwise, the following stream is executed.
       if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) {  const leaf = leaves[0] 
  • Return callback(null, {
    Size: leaf.size,
    leafSize: leaf.leafSize,
    Multihash: leaf.multihash,
    Path: file.path,
    Name: leaf.name
    })
    }
    2) Create a parent node and add all its leaf nodes. When the file size is large, IPFS will block the block, and each block will form the leaf node here. Finally, the leaves will generate the corresponding DAGLink in the order of their block, and then add to the parent DAGNode in turn. The parent DAGNode saves not the file content, but the DAGLink of these leaf nodes, thus forming the complete content of the file.

     const f = new UnixFS('file') 

    Const links = leaves.map((leaf) => {
    f.addBlockSize(leaf.leafSize)

    Return new DAGLink(leaf.name, leaf.size, leaf.multihash)
    })
    3) Call the waterfall function to process the parent node in sequence. This place is similar to processing a single block, which is to create a DAGNode object and call the persist function for persistence. Note: The difference here is that the parent node has leaf nodes, ie links not empty.

     waterfall([  (cb) => DAGNode.create(f.marshal(), links, cb),  (node, cb) => persist(node, ipld, options, cb) ], (error, result) => {  if (error) {    return callback(error)  } 

    Callback(null, {
    Size: result.node.size,
    leafSize: f.fileSize(),
    Multihash: result.cid.buffer,
    Path: file.path,
    Name: ''
    })
    })
    4) After the above waterfall function is processed, call the callback function to continue processing.

    The callback function in the normalization function reduce is the following callback function in the collect stream, ie, the sink stream. When the normal function reads the data, the callback function is called, so that the data is pulled to the collect stream, and then enters the reduced function. deal with.

  • The fourth function is the collect stream of the pull-stream class library, which is a sink stream. Its processing function reduced process is as follows: 1) If there is an error in the previous stream, the callback function of the reduceToParents function is directly called for processing;
  • 2) Otherwise, if the currently received data length is greater than 1, that is, after the previous normalization process, there are still multiple root DAGNodes, then the reduceToParents function is called to continue the normalization process;

    3) Otherwise, call the callback function of the reduceToParents function for processing.

    The callback function of the reduceToParents function, which is a very important function, writes the read data into the pull-pushable stream represented by result inside the function, so as to get the data in the external stream behind it.

  • Returns a two-way stream object. The bidirectional stream object returned here is
     {    sink: pair.sink,    source: result } 

    The sink is the sink stream defined in the pull-pair class library, which is used by the external pull function to read data from the previous stream; source is the stream in the pull-pushable class library, in the callback function of the reduceToParents function. The data is pushed so that the associated stream in the external pull function can read the function from it.

  • Call the collect stream and pass the result of the save file to the external function in the handler for this stream.
     collect((err, roots) => {    if (err) {      callback(err)    } else {      callback(null, roots[0])    } }) 

    The callback here is passed in when the createAndStoreFile function is called, and its call is in the builer/builder.js file. Simply review the calling code:

     createAndStoreFile(item, (err, node) => {  if (err) {    return cb(err)  }  if (node) {    source.push(node)  }  cb() }) 

    The anonymous callback function here is the callback above. In the callback function, the result of saving the file is written into the source stream, thereby passing the data to the outermost pull stream.

  • At this point, we have completely analyzed the core process of saving files/contents. From the beginning to the end, you are not very rewarding. Next, stay tuned for the next article.

    Click to review:

    Proficient in IPFS: IPFS saves content

    Proficient in IPFS: IPFS saves content