Proficient in IPFS: IPFS saves the content below
pull
function to save the file in the builder/builder.js
file, we will study this process in detail in this article.
- Set the source stream to
file.content
. - Call the
chunker
stream to block the saved content. From the previous article, we know that the default implementation of thechunker
stream ischunker/fixed-size.js
, which is apull-through
stream. This stream provides two functions, calledonData
andonEnd
, which are calledonData
time the data arrives, and the latter is called when the data is sent.fixed-size.js
sets the size of each block according to themaxChunkSize
property specified in the option during initialization. Below, let's take a look at its two methods in itsonData
andonEnd
.
onData
function is handled as follows:
- Each time the data is received, it is stored in the
BufferList
, and the current data length is also added to the length of the data read. Bl.append(buffer) currentLength += buffer.length - If the current data length is greater than or equal to the specified block size, then the following loop processing is performed until the current data length is less than the specified block size.
- The specified block size data is retrieved from the buffer list into the queue.
this.queue(bl.slice(0, maxSize))
- If the length of the buffer list is exactly equal to the specified block size, then re-create a new buffer list and set the current data length to 0; otherwise, generate a new buffer list and block size from the old buffer Read the data into the new buffer list (0 to the block size data has been read above), set it to the old buffer list, and update the current data length set minus the previous step to read Get the block size length, thus the buffer list and its length.
if (maxSize === bl.length) { bl = new BufferList() currentLength = 0 } else { const newBl = new BufferList() newBl.append(bl.shallowSlice(maxSize)) bl = newBl currentLength -= maxSize }
After reading the
onData
method, let's look at theonEnd
function. This function first checks if there is data in the buffer list (less than the block size), and if so, saves it to the queue.if (currentLength) { this.queue(bl.slice(0, currentLength)) emitted = true }
- The specified block size data is retrieved from the buffer list into the queue.
If (!emitted) {
This.queue(Buffer.alloc(0))
}
- The European Central Bank reserves bitcoin? The official reply said: No, it is just a speculative asset.
- Cross-chain investment opportunities and three pool model analysis
- Blockchain Recruitment Report: Average annual salary of $105,000; London's largest demand
This.queue(null)
The above logic for fixed partitioning in IPFS is actually very simple.
paraMap
stream (type pull-paramap) to process each chunk. When the previous stream chunks the file, each partition will pull the next stream. This is the function here. Let's see how this function handles each chunk. Its body is a waterfall
function. As the name suggests, each function performs its own processing and passes the result to the next function. Let's take a look at some of its handlers. First, let's look at the first function, which is mainly used to create a DAGNode
and pass the relevant information to the second function. Its execution logic is as follows:
- Generate a
UnixFS
object.const file = new UnixFS(options.leafType, buffer)
UnixFS is a protocol-buffer-based format for describing files, directories, and symbolic links in IPFS. Currently it supports: raw data, directories, files, raw data, symbolic links, hamt-sharded-directory and so on.
leafType
defaults to a file, defaultOptions
specified by default option defaultOptions
when the file is initialized.
DAGNode.create
static method to create a DAGNode
node. After success, pass the belief information to the next function.
DAGNode.create(file.marshal(), [], (err, node) => { if (err) { return cb(err) }
Cb(null, {
Size: node.size,
leafSize: file.fileSize(),
Data: node
})
})
The main content of the UnixFS marshal
method is to encode the file content (byte buffer). Here DAGNode
refers to the DAGNode
function object defined in dag-node/index.js
in the ipld-dag-pb library. Its create
method is defined in create.js
in the same directory. Let's take a look at this method. . Its main content is to check the partition data of the file and the link
of other blocks, and then create the DAGNode
object after the sequence of the two. The latter's constructor is relatively simple, and only saves the data of the block and the connection with other blocks (representing the relationship with other blocks). Next, let's look at the second function. Its main function is to save the generated DAGNode
to the system and pass the saved result to the next function. Its execution logic is as follows:
- Call the
persist
method to save the DAG node. This is a very important step. It not only saves the block object in the local repository, but also whether to save the block CID on the node closest to it. It also involves sending the block through the bitswap protocol to those who want it. It's in the node. Its implementation is as follows:- Get the CID version number, hash algorithm, encoding method, etc. from the options.
let cidVersion = options.cidVersion || defaultOptions.cidVersion let hashAlg = options.hashAlg || defaultOptions.hashAlg let codec = options.codec || defaultOptions.codec
- Get the CID version number, hash algorithm, encoding method, etc. from the options.
If (Buffer.isBuffer(node)) {
cidVersion = 1
Codec = 'raw'
}
If (hashAlg !== 'sha2-256') {
cidVersion = 1
}
By default, the version number is 0, the hash algorithm is SHA256, and the encoding method is dag-pb, which is a JS implementation based on the Protocol specification.
cid
function in util.js
in the ipld-dag-pb library is called to get the CID of the DAG node and then return directly.
if (options.onlyHash) { return cid(node, { version: cidVersion, hashAlg: hashAlg }, (err, cid) => { callback(err, { cid, node }) }) }
put
IPLD object to hold the DAG node.
ipld.put(node, { version: cidVersion, hashAlg: hashAlg, format: codec }, (error, cid) => { callback(error, { cid, node }) })
The IPLD object is defined in the ipld library. IPLD plays a very important role in IPFS. It is the abbreviation of InterPlanetary Linked-Data. It represents the ambition and hope of IPFS. The desire to link everything together can be done with Bitcoin, Ethereum, Zcash, git, etc. It holds ipfs-block-service, which in turn holds the ipfs repository object and the bitswap object, which form the core of ipfs.
Let's look at the put
method and see how it saves the DAG object. Its main body is to call the internal method to get the format of the current DAG object encoding, and then use the cid
method matching this format to get the object's CID object, and then call the internal _put
to save the data.
this._getFormat(options.format, (err, format) => { if (err) return callback(err)
Format.util.cid(node, options, (err, cid) => {
If (err) {
Return callback(err)
}
if (options.onlyHash) { return callback(null, cid) }
This._put(cid, node, callback)
}) })
Next, let's look at this internal _put
method. The main body of this method is a waterfall
function. Several internal functions get the corresponding encoding format according to the CID object, and then serialize the DAG node object using the corresponding method of the encoding format. Block Block
object, and call the put
method of the block service object to save the block.
The block service object is defined in the ipfs-block-service library. Its put
method determines whether to call the repository object to save the block or to call the bitswap to save the block, depending on whether there is a bitswap object (initialization is empty). For our example, it calls bitswap to save the block.
The put
method of the bitswap object not only saves the block in the underlying blockstore, but also sends it to the nodes that need it. Its main body is a waterfall
function, in which the first function checks whether the block storage in this area has this block, and the second determines whether to ignore the call or whether to actually save the block according to whether there is a local block.
waterfall([ (cb) => this.blockstore.has(block.cid, cb), (has, cb) => { if (has) { return nextTick(cb) }
this._putBlock(block, cb)
} ], callback)
The _putBlock
method of the bitswap object calls the put
method of the block storage object to save the block object in the local repository, and after successful, triggers an event that receives the block, and at the same time saves the CID in the nearest through the provide
method of the network object. In the node, the engine object's receivedBlocks
method is then called to send the received block object to all nodes that want the block.
this.blockstore.put(block, (err) => { if (err) { return callback(err) }
this.notifications.hasBlock(block)
This.network.provide(block.cid, (err) => {
If (err) {
This._log.error('Failed to provide: %s', err.message)
}
})
This.engine.receivedBlocks([block.cid])
Callback()
})
There are two important objects in the bitswap object, one is the network object and the other is the engine object.
The network object's provide
method directly calls the libp2p object's content routing method of the same name to handle the block's CID. The libp2p object's content routing saves all the specific routing methods. By default, it is empty, that is, there is no routing method. Instead, we specify libp2p.config.dht.enabled
in the configuration file to specify the content routing. DHT routing, so the CID of the final block will be saved in the most appropriate node.
In the initial method, the network object specifies its own two methods as the processor of the libp2p object's node connection and disconnection event, so as to get the corresponding notification when connecting and disconnecting, and also call the handle
method of the libp2p object. Thus, it becomes the processing object of the two definitions of libp2p object /ipfs/bitswap/1.0.0
and /ipfs/bitswap/1.1.0
, so when libp2p receives these two kinds of messages, it will call the corresponding of the network object object. The method is processed.
The network object processing bitswap is handled by the pull
function. The general flow is as follows: Get the message from the connection object, then deserialize it into a message object, then get its node information object through the connection object, and then call the inside of the bitswap object. The method _receiveMessage
handles the incoming message, which in turn calls the messageReceived
method of the engine object to process the received message.
The general flow of the messageReceived
method of the engine object is as follows:
1) Call the internal method _findOrCreate
to find or create the general ledger object Ledger of the remote peer node. If it is the newly created general ledger object, it should also be placed in the internal mapping set. The key is the Base58 string of the remote peer node.
2) If the message is a complete message, generate a new list of wanted requests.
3) Call the internal method _processBlocks
to process the block object in the message.
4) If the desired list in the message is empty, exit the method.
5) traversing the desired list in the message. If the currently desired entity is canceled, the corresponding item is removed from the corresponding ledger of the corresponding node and saved in the canceled item list; otherwise, the current item is saved in the corresponding node. In the general ledger, it is also saved in the desired list.
6) Call the internal method _cancelWants
to filter out the canceled tasks in the task, that is, delete the tasks that have been canceled in the task.
7) Call the internal method _addWants
to handle all the desired lists of remote peer nodes. The block storage object is called to determine whether the desired item is already in the local repository, and if so, the corresponding task is generated.
The receivedBlocks
method of the engine object checks all connected remote nodes (general ledger objects) when they receive a specific block to see if they want the block, and if so, generates a task to process in the background.
pullThrough
stream (type pull-through stream) to process each data received. This process is relatively simple, not to elaborate here. reducer
stream to normalize all generated chunks. By default, the reducer
stream is generated in balanced/index.js
by calling the balanced/balanced-reducer.js
function in balanced/balanced-reducer.js
. Let's take a look at the execution of this function:
- Generate
pull-pair
objects andpull-pushable
objects.const pair = pullPair() const source = pair.source
Const result = pushable()
reduceToParents
function to establish an internal pull
stream. The main body of the function is a stream created by a pull
function. Several of its functions are as follows:
- The first function is the previously created
source
stream. - The second function is a stream defined by the pull-batch class library. This is a pull-through stream. It implements its own writer and ender functions. It saves the data acquired each time in an internal array. It will not be saved to the queue of pull-through streams after a certain program.
- The third function is the
async-map
stream of the pull-stream class library, which is athrough
stream, similar to themap
stream, but with better performance. Its normalization functionreduce
default thereducefile
function returned inbuilder/reduce.js
. Its flow is as follows: 1) If the current number of leaf nodes is 1, and itssingle
flag is true, and there are configurations in the option to group the individual leaves to themselves, then the callback object is called directly; otherwise, the following stream is executed.if (leaves.length === 1 && leaves[0].single && options.reduceSingleLeafToSelf) { const leaf = leaves[0]
Return callback(null, {
Size: leaf.size,
leafSize: leaf.leafSize,
Multihash: leaf.multihash,
Path: file.path,
Name: leaf.name
})
}
2) Create a parent node and add all its leaf nodes. When the file size is large, IPFS will block the block, and each block will form the leaf node here. Finally, the leaves will generate the corresponding DAGLink in the order of their block, and then add to the parent DAGNode in turn. The parent DAGNode saves not the file content, but the DAGLink of these leaf nodes, thus forming the complete content of the file.
const f = new UnixFS('file')
Const links = leaves.map((leaf) => {
f.addBlockSize(leaf.leafSize)
Return new DAGLink(leaf.name, leaf.size, leaf.multihash)
})
3) Call the waterfall
function to process the parent node in sequence. This place is similar to processing a single block, which is to create a DAGNode object and call the persist
function for persistence. Note: The difference here is that the parent node has leaf nodes, ie links
not empty.
waterfall([ (cb) => DAGNode.create(f.marshal(), links, cb), (node, cb) => persist(node, ipld, options, cb) ], (error, result) => { if (error) { return callback(error) }
Callback(null, {
Size: result.node.size,
leafSize: f.fileSize(),
Multihash: result.cid.buffer,
Path: file.path,
Name: ''
})
})
4) After the above waterfall
function is processed, call the callback function to continue processing.
The callback function in the normalization function reduce
is the following callback function in the collect
stream, ie, the sink
stream. When the normal function reads the data, the callback function is called, so that the data is pulled to the collect
stream, and then enters the reduced
function. deal with.
collect
stream of the pull-stream class library, which is a sink
stream. Its processing function reduced
process is as follows: 1) If there is an error in the previous stream, the callback function of the reduceToParents
function is directly called for processing; 2) Otherwise, if the currently received data length is greater than 1, that is, after the previous normalization process, there are still multiple root DAGNodes, then the reduceToParents
function is called to continue the normalization process;
3) Otherwise, call the callback function of the reduceToParents
function for processing.
The callback function of the reduceToParents
function, which is a very important function, writes the read data into the pull-pushable stream represented by result
inside the function, so as to get the data in the external stream behind it.
{ sink: pair.sink, source: result }
The sink
is the sink
stream defined in the pull-pair
class library, which is used by the external pull
function to read data from the previous stream; source
is the stream in the pull-pushable
class library, in the callback function of the reduceToParents
function. The data is pushed so that the associated stream in the external pull
function can read the function from it.
collect
stream and pass the result of the save file to the external function in the handler for this stream.
collect((err, roots) => { if (err) { callback(err) } else { callback(null, roots[0]) } })
The callback
here is passed in when the createAndStoreFile
function is called, and its call is in the builer/builder.js
file. Simply review the calling code:
createAndStoreFile(item, (err, node) => { if (err) { return cb(err) } if (node) { source.push(node) } cb() })
The anonymous callback function here is the callback
above. In the callback function, the result of saving the file is written into the source
stream, thereby passing the data to the outermost pull
stream.
At this point, we have completely analyzed the core process of saving files/contents. From the beginning to the end, you are not very rewarding. Next, stay tuned for the next article.
Click to review:
Proficient in IPFS: IPFS saves content
Proficient in IPFS: IPFS saves content
We will continue to update Blocking; if you have any questions or suggestions, please contact us!
Was this article helpful?
93 out of 132 found this helpful
Related articles
- Market Analysis: The market is fierce, and the results of long and short battles will determine the trend of key trends.
- Facebook sent a letter to Senator: Libra does not obtain personal financial information from users.
- Blockchain Weekly | Facebook opens a long-term game with global regulation
- Opinion: China or develop official digital currency on WeChat
- Zhou Xiaochuan: Libra represents the trend of digital currency, China should take precautions
- Number said | In the first half of 2019, blockchain private placement financing of 11.851 billion yuan exceeded 60% of funds invested in the US market
- Attack on the big body! Goldman Sachs sets up a digital asset team or competes with Morgan