Proficient in IPFS: IPFS saves content

After the previous analysis , we have already understood the IPFS startup process. From today, I will analyze some common commands or actions, I hope everyone likes it.

Before you start to really analyze these commands/actions, you should first briefly introduce the pull-stream class library. If you are not familiar with this class library, there is no way to proceed.

Pull-stream is a new type of stream library in which data is pulled from the source to the destination. It has two basic types of streams: the Source source and the Sink receiver. In addition, there are two composite types of streams: the Through channel stream (such as conversion) and the Duplex bidirectional stream.

The source stream, which returns an anonymous function called the read function, which is called by the subsequent sink stream function or through stream function to read the contents of the source stream.

The sink stream, which eventually returns to the sink function in the internal drain.js. This type of stream mainly reads data and processes each read data. If the stream has ended, it calls the user-specified end function for processing.

Through stream, the function of this type of stream returns a nested anonymous function. The first layer function receives the read function of a source stream or the first layer function returned by other through functions as parameters, and the second layer function receives the write provided by the final sink. The second layer function returned by a function or other through, the second layer function internally calls the read function to directly or indirectly retrieve data from the source, and directly or indirectly call the sink function after the data is acquired, thereby writing the data to the destination address.

In pull-streams, data must have a complete pipeline before it flows, which means one source, zero or more channels, and one receiver. But it's still very useful to be able to create a partial pipeline. In other words, you can create a complete pipeline, such as pull(source, sink) => undefined , or you can pull(through1, through2) => through the pipeline, such as pull(through, sink) => sink , or pull(through1, through2) => through We will encounter this part of the pipeline in large numbers below. Today, we look at the first most commonly used add command / action, we use IPFS just to save the file to IPFS, naturally indispensable to save operations, add command is to do this, a few gossip, let's look at a piece of code.

 const {createNode} = require('ipfs') 

Const node = createNode({
Libp2p:{
Config:{
Dht:{
Enabled: true
}
}
}
})

Node.on('ready', async () => {

Const content = `I love black firefly`;

Const filesAdded = await node.add({
Content: Buffer.from(content)
},{
chunkerOptions:{
maxChunkSize: 1000,
avgChunkSize: 1000
}
})

Console.log('Added file:', filesAdded[0].path, filesAdded[0].hash)
})
This time we didn't use the default configuration completely, and started DHT. The readers who read my article know what DHT is, and I won't explain it in detail here. In the program, the content is uploaded by calling the add method of the IPFS node. The content can be a file or a direct content. There is a slight difference between the two. When we talk about the relevant code, we point out the difference. Here we For the sake of simply uploading content directly, let us explain.

add method is located in the core/components/files-regular/add.js file. In the article "Proficient in IPFS: System Startup", we said that the system will put all the core/components/files-regular directories. The file is extended to the IPFS object, which naturally includes the add.js file here. Below, we directly look at the execution flow of this function.

This function returns an internally defined function, does some processing on the parameters in this internally defined function, and then calls the internal add function, the latter is the body, its logic is as follows:

  1. First, check if the option object is a function, and if so, regenerate the relevant variables.
     if (typeof options === 'function') {  callback = options  options = {} } 
  2. Define a tool function that detects the content to detect what we want to upload.
     const isBufferOrStream = obj => Buffer.isBuffer(obj) || isStream.readable(obj) || isSource(obj) const isContentObject = obj => {  if (typeof obj !== 'object') return false  if (obj.content) return isBufferOrStream(obj.content)  return Boolean(obj.path) && typeof obj.path === 'string' } 

Const isInput = obj => isBufferOrStream(obj) || isContentObject(obj) const ok = isInput(data) || (Array.isArray(data) && data.every(isInput))

If (!ok) { return callback(new error('invalid input: expected buffer, readable stream, pull stream, object or array of objects')) }

  • Next, execute the pull function provided by the pull -stream class library. Let's look at the main content of the pull function. Its first argument is the result of the pull.values function, which is a source stream that returns a function called read to read the data we provide. The read function reads the value of the current index position from the array, and uses this value as a parameter to call the callback function defined inside the second function of the through function or the callback function defined inside the final sink function. If the array has already been read, it is called directly with true as the argument.
  • The second parameter is the addPullStream method of the IPFS object. This method is also extended to the IPFS object using the same method at startup. Its body is the function in the add-pull-stream.js file of the current directory. Next, we'll look at this function in detail, and now we just need to know that this function returns a partial pipeline.

    The third parameter is the function defined in pull-sort . This is a library that depends on the pull-stream . It is sorted according to certain rules. We don't care about this function.

    The last parameter is the result of the pull.collect function, which is a sink stream. It puts the final result into an array and then calls the callback function. The reason why the filesAdded we saw in the previous code is an array is thanks to this function.

    The above logic code is as follows:

     pull(  pull.values([data]),  self.addPullStream(options),  sort((a, b) => {    if (a.path < b.path) return 1    if (a.path > b.path) return -1    return 0  }),  pull.collect(callback) ) 

    In the above code, we make an array of the content to be saved, the specific reasons are explained below. Now, let's look at the addPullStream method, which is the main body for saving content. The add method is only an appetizer. addPullStream method execution logic is as follows:

    1. Call the parseChunkerString function to handle the options related to content blocking. This function is located in the utils.js file in the same directory, which checks the user-specified blocking algorithm. If the user does not specify, the fixed blocking algorithm is used, the size is 262144 of the system default; if the size is specified, the fixed blocking algorithm is used, but the size is specified by the user; if the rabin class division method is specified, the variable length segmentation The method calls the internal function to generate the corresponding split option. The above logic code is as follows:
       parseChunkerString = (chunker) => {  if (!chunker) {    return {      chunker: 'fixed'    }  } else if (chunker.startsWith('size-')) {    const sizeStr = chunker.split('-')[1]    const size = parseInt(sizeStr)    if (isNaN(size)) {      throw new Error('Chunker parameter size must be an integer')    }    return {      chunker: 'fixed',      chunkerOptions: {        maxChunkSize: size      }    }  } else if (chunker.startsWith('rabin')) {    return {      chunker: 'rabin',      chunkerOptions: parseRabinString(chunker)    }  } else {    throw new Error( Unrecognized chunker option: ${chunker} )  } } 

      Note: We can also add our own segmentation algorithm by rewriting this function.

    2. Merge the option variables.
       const opts = Object.assign({}, {  shardSplitThreshold: self._options.EXPERIMENTAL.sharding    ? 1000    : Infinity }, options, chunkerOptions) 
    3. Set the default CID version number. If the Hash algorithm is specified, but the CID version is not 1, then it is forced to 1. CID is a self-describing content addressing identifier for distributed systems. There are currently two versions 0 and 1. Version 0 is a backward compatible version that only supports the sha256 hash algorithm and cannot be specified.
       if (opts.hashAlg && opts.cidVersion !== 1) {  opts.cidVersion = 1 } 
    4. Set the progress handler, the default is empty implementation.
       const prog = opts.progress || noop const progress = (bytes) => {  total += bytes  prog(total) } 

    Opts.progress = progress

  • Use the pull function to return a partial pull-stream stream. This partial pull-stream stream is the key to handling file/content preservation, so let's take a closer look.
    1. First call the pull.map method to process the saved content. pull.map method is a source stream in a pull-stream stream that uses the specified handler for each element in the array. This is why we convert the content we need to save into an array in the add function. Here, the function that handles each array element is normalizeContent . This function is defined in the same file, it first checks whether the saved content is an array, if not then converts to an array; then, each element in the array is processed, as follows:
      • If the saved content is a Buffer object, the content to be saved is converted into an object whose path is an empty string and whose content is a pull-stream stream.
         if (Buffer.isBuffer(data)) {  data = { path: '', content: pull.values([data]) } } 
      • If the saved content is a Node.js readable stream, such as a file, convert the path to be saved to an empty string, and use the source method library of the stream-to-pull-stream class to stream Node.js. A source stream object that is transformed into a pull-stream.
         if (isStream.readable(data)) {  data = { path: '', content: toPull.source(data) } } 
      • If the saved content is the source stream of the pull-stream, the content to be saved is converted into an object whose path is an empty string and whose content is unchanged.
         if (isSource(data)) {  data = { path: '', content: data } } 
      • If the content to be saved is an object and the content attribute exists and is not a function, proceed as follows:
         if (data && data.content && typeof data.content !== 'function') {  if (Buffer.isBuffer(data.content)) {    data.content = pull.values([data.content])  } 
  • If (isStream.readable(data.content)) { data.content = toPull.source(data.content) } }

  • If the path is specified, the following processing is performed.
     if (opts.wrapWithDirectory && !data.path) {  throw new Error('Must provide a path when wrapping with a directory') } 
  • If (opts.wrapWithDirectory) { data.path = WRAPPER + data.path }

  • Returns the final generated content to be saved.
  • Call the pull.flatten() method to flatten the array generated in the previous step. flatten method is a through stream, which is mainly to convert multiple streams or array streams into a stream, such as converting multiple arrays into an array, such as:
     [  [1, 2, 3],  [4, 5, 6],  [7, 8, 9] ] 

    After such an array is processed using this method, it will eventually become the following array.

     [1, 2, 3, 4, 5, 6, 7, 8, 9] 
  • Call the importer function to save the content. This function is defined in the ipfs-unixfs-importer class library, which is a JavaScript implementation of IPFS for handling file layout and blocking mechanisms. How to save content, how to partition it, we will analyze it in detail in the next article. .
  • Call the pull.asyncMap method to preprocess the saved files/contents to generate the content that the user sees. When the program is executed here, the file or content we want to save has been saved in the local IPFS repository. We can use the API, get , ls and other commands to view the contents or files we saved. asyncMap method is a through stream, similar to a map stream, but with better performance. It will process each array element, here the handler is prepareFile .
  • This function is defined in the same file, and its processing is as follows:

    • Generate a CID object using the multihash content of the generated file.
       let cid = new CID(file.multihash) 

      The CID constructor will check the passed parameters. If it is a CID object, it will take the version number, encoding mode, multi-hash and other attributes directly from the object; if it is a string, it will be divided into multibase encoding, if it is Need to decode first, and then separate out various attributes, if not multibase encoding, then it is definitely a base58 string, then set the version to 0, the encoding method is dag-pb , and then get the multi-hash value from the base58 string; Buffer object, then get the first byte, and convert it to an integer in hexadecimal. If the first byte is 0 or 1, the corresponding attribute is generated. Otherwise, if it is multi-hash, the version is set to 0. For dag-pb .

    • If the user specifies a CID version of 1, then a CID object is generated to version 1.
       if (opts.cidVersion === 1) {    cid = cid.toV1() } 
    • Next, call the waterfall method to process the function it specifies. The first function, check whether the configuration option specifies onlyHash , that is, does not actually upload the file to the IFS network, just calculate the HASH of this file, then directly call the second function, otherwise, call the object.get method of the IPFS object. Gets the node information of the specified file saved in the repository. This method will be explained in detail later. I will not talk about it here. The second function, which generates the object that is ultimately returned to the user, includes: path, size, hash, and so on.

    The above code is as follows, it is relatively simple, you can read it yourself.

       waterfall([    (cb) => opts.onlyHash      ? cb(null, file)      : self.object.get(file.multihash, Object.assign({}, opts, { preload: false }), cb),    (node, cb) => {      const b58Hash = cid.toBaseEncodedString() 
       let size = node.size 

    If (Buffer.isBuffer(node)) { size = node.length }

    Cb(null, { path: opts.wrapWithDirectory ? file.path.substring(WRAPPER.length) : (file.path || b58Hash), hash: b58Hash, size }) }

       ], callback) 
  • Call the pull.map method to preload the files that have been saved locally to the specified node. map is a through stream that processes each array element, where the handler is preloadFile . This function is defined in the same file, which will preload the saved file to the specified node, which nodes are stored. You can refer to preload.addresses "Proficient IPFS: System Startup Overview" section, or you can specify it manually. .
  • Call the pull.asyncMap method to save the files that have been saved to the local file for a long time, ensuring that they are not garbage collected. asyncMap method is a through stream, where the handler is pinFile . After the pin operation, we will analyze it in detail. We will not mention it here. Readers can read the relevant code by themselves.