시작 옵션으로 Lake 인덱서 확장
끝
This tutorial ends with the example code of the simple indexer built on top of NEAR Lake Framework that can start:
- 지정된 블록 높이에서(기본적으로)
./target/release/indexer mainnet from-block 65359506
- 네트워크의 최신 최종 블록에서
./target/release/indexer mainnet from-latest
- 중단되기 전에 마지막으로 인덱싱한 블록에서
./target/release/indexer mainnet from-interruption
목표
To find out whether you need an indexer for you project and to create one means you're covering only one side of things - the development.
There is another important side - the maintenance. This involves:
- 인덱서를 최신 버전의 의존성(dependency)으로 업그레이드해야 함
- 인덱서를 사용자가 만든 새 기능으로 업데이트해야 함
- 사용하는 서버에 약간의 유지 관리가 필요함
- 사건이 발생
- etc.
Almost in all of the above cases you might want to start or restart your indexer not only from the specific block you need to provide, but from the block it was stopped, or from the latest final block in the network.
NEAR Lake Framework doesn't provide such options. Actually, we didn't empower the library with these options to start indexer intentionally.
We want to keep NEAR Lake Framework crate in the narrowest possible way. The goal for the library is to do a single job and allow it to be empowered with any features but outside of the crate itself
Though, the possibility to start indexer from the latest block or from the block after the one it has indexed the last, might be very useful.
Also, during the April Data Platform Community Meeting we had a question whether we plan to add this feature to the library. We've promised to create a tutorial showing how to do it by your own. So here it is.
준비
In this tutorial we're not going to focus our attention on the indexer itself, but on the start options instead.
To simplify the code samples in the tutorial, we're writing entire application in a single file src/main.rs
.
Please, do not take it as a design advice. We do it only for the simplicity
Let's prepare a project with a base dependencies, so we can focus on the main goal of this tutorial.
Create a new Rust project
cargo new --bin indexer && cd indexer
Replace the content of the Cargo.toml
file with this:
[package]
name = "indexer"
version = "0.1.0"
edition = "2021"
rust-version = "1.60.0"
[dependencies]
clap = { version = "3.1.6", features = ["derive"] }
futures = "0.3.5"
itertools = "0.9.0"
tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1" }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
near-lake-framework = "0.3.0"
Replace the content of src/main.rs
with this:
use clap::{Parser, Subcommand};
use futures::StreamExt;
use tracing_subscriber::EnvFilter;
// TODO: StartOptions
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_tracing();
let opts = Opts::parse();
// TODO: Config
let stream = near_lake_framework::streamer(config);
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(handle_streamer_message)
.buffer_unordered(1usize);
while let Some(_handle_message) = handlers.next().await {}
Ok(())
}
async fn handle_streamer_message(
streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage,
) {
eprintln!(
"{} / shards {}",
streamer_message.block.header.height,
streamer_message.shards.len()
);
std::fs::write("last_indexed_block", streamer_message.block.header.height.to_string().as_bytes()).unwrap();
}
fn init_tracing() {
let mut env_filter = EnvFilter::new("near_lake_framework=info");
if let Ok(rust_log) = std::env::var("RUST_LOG") {
if !rust_log.is_empty() {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
eprintln!("Ignoring directive `{}`: {}", s, err);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
}
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}
This code is not going to build yet. Meanwhile let's have a quick look of what we've copy/pasted for now:
- 수락할 명령줄 인자를 설정하기 위해
clap
을 가져왔습니다. - 또한
futures
와tracing_subscriber
같은 중요한 필수 항목이 있습니다. - 파일 끝의
init_tracing
는 애플리케이션을near-lake-framework
의 로그에 등록하는 헬퍼 함수입니다. - 인덱서 상용구 코드가 있는 비동기
main
함수는LakeConfig
생성 파트가 없습니다. - 이 튜토리얼 코드를 작성할 위치를 찾을 수 있도록 표시된 몇 가지
// TODO: ...
섹션을 찾을 수 있습니다.
OK, all the preparations are done. Let's move on.
StartOptions
디자인
So we want to be able to pass a command that defines the way our indexer should start. In this tutorial we'll be using clap
.
We need a structure that receives the chain id. This will allow us to use command:
./target/release/indexer mainnet ...
OR
./target/release/indexer testnet ...
Let's replace the // TODO: StartOptions
in the src/main.rs
with:
#[derive(Parser, Debug, Clone)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
struct Opts {
#[clap(subcommand)]
pub chain_id: ChainId,
}
#[derive(Subcommand, Debug, Clone)]
enum ChainId {
#[clap(subcommand)]
Mainnet(StartOptions),
#[clap(subcommand)]
Testnet(StartOptions),
}
Now we want to create a StartOptions
structure that will allow us to tell our indexer where to start indexing from. The command should look like:
./target/release mainnet from-latest
Our variants are:
from-block N
(N
은 시작할 블록 높이)from-latest
(네트워크의 최신 최종 블록에서 시작)from-interruption
(이전에 중단된 블록 인덱서에서 시작)
Let's replace the comment // TODO: StartOptions
with the enum:
#[derive(Subcommand, Debug, Clone)]
pub(crate) enum StartOptions {
FromBlock { height: u64 },
FromLatest,
FromInterruption,
}
Pretty simple and straightforward, agree?
LakeConfig
생성
In order to create LakeConfig
we're going to use a config builder LakeConfigBuilder
. Fotunately, we've imported it already.
Let's instantiate a builder in place of // TODO: Config
comment:
let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();
Notice that lake_config_builder
is defined as mutable.
Now we need to set the chain we are going to index by matching ChainId
provided:
let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();
match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet();
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet();
}
}
As you can see, depending on the variant of the ChainId
enum we modify the lake_config_builder
with one of the shortcuts mainnet()
or testnet()
.
The only parameter left to set is the most important for us in this tutorial start_block_height
Normally, we just pass the block height number u64
but we're implementing the start options here.
시작 옵션 로직
Let's create a separate function that will hold the logic of identification the start_block_height
and call it get_start_block_height
.
Just read the code, don't copy, it's not final approach yet
FromBlock { height: u64 }
Let's start from implementation from-block N
as the simplest one:
async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,
}
}
OK, it's simple enough, what's about other match arms for StartOptions
:
async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}
Er, how should we get the latest block from the network? We should query the JSON RPC and get the final block, extract its height and call it a day.
FromLatest
In order to query the JSON RPC from within Rust code we need to use near-jsonrpc-client-rs
crate
You can find a bunch of useful examples in the corresponding folder of the project's repository on GitHub.
Add it to Cargo.toml
in the end:
near-jsonrpc-client = "0.3.0"
The code for getting the final block height would look like the following:
use near_jsonrpc_client::{methods, JsonRpcClient};
use near_lake_framework::near_indexer_primitives::types::{BlockReference, Finality};
async fn final_block_height() -> u64 {
let client = JsonRpcClient::connect("https://rpc.mainnet.near.org");
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};
let latest_block = client.call(request).await.unwrap();
latest_block.header.height
}
Nice and easy. Though, a hardcoded value of "https://rpc.mainnet.near.org"
looks not so great. Especially when we want to support both networks.
But we can handle it by passing the JSON RPC URL to the get_start_block_function
like this:
async fn get_start_block_height(
start_options: &StartOptions,
rpc_url: &str,
) -> u64 {
...
}
...
match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.mainnet.near.org",
).await
);
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.testnet.near.org",
).await
)
}
}
Meh. It's ugly and why should we pass it everytime if it is required in only one case from three possible?
Instead we can pass to the get_start_block_height
function the entire Opts
.
async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.chain_id {
ChainId::Mainnet(start_options) => {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}
}
}
At least we have everything we need. Though, it still looks ugly and will definitely involve code duplication.
What we propose instead to is create impl Opts
with a few useful methods to get JSON RPC URL and to get StartOptions
instance.
Now you may proceed copying the code safely
Somewhere under the StartOptions
definition add the following:
impl Opts {
pub fn rpc_url(&self) -> &str {
match self.chain_id {
ChainId::Mainnet(_) => "https://rpc.mainnet.near.org",
ChainId::Testnet(_) => "https://rpc.testnet.near.org",
}
}
pub fn start_options(&self) -> &StartOptions {
match &self.chain_id {
ChainId::Mainnet(args) | ChainId::Testnet(args) => args
}
}
}
And now we can create our get_start_block_height
function with the helper function that will query the final block final_block_height
(we're going to reuse it, watch for the hands):
async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.start_options() {
StartOptions::FromBlock { height } => *height,
StartOptions::FromLatest => final_block_height(opts.rpc_url()).await,
// a placeholder
StartOptions::FromInterruption => 0,
}
}
async fn final_block_height(rpc_url: &str) -> u64 {
let client = JsonRpcClient::connect(rpc_url);
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};
let latest_block = client.call(request).await.unwrap();
latest_block.header.height
}
You may have noticed the FromInterruption
and a comment about the placeholder. The reason we've made is to be able to build the application right now to test out that FromLatest
works as expected.
FromLatest
테스트
Please, ensure you've the credentials set up as described on the Credentials page. Otherwise you won't be able to get the code working.
Let's try to build and run our code
cargo build --release
./target/release/indexer mainnet from-latest
Once the code is built you should see something like that in your terminal:
65364116 / shards 4
65364117 / shards 4
65364118 / shards 4
65364119 / shards 4
65364120 / shards 4
You can stop it by pressing CTRL+C
And now we can move on to FromInterruption
FromInterruption
In order to let an indexer know at what block it was interrupted, the indexer needs to store the block height somewhere. And it should do it in the and of the handle_message
function.
In the boilerplate code you've copy/pasted in the beginning of this tutorial you can notice a line of code:
std::fs::write("last_indexed_block", streamer_message.block.header.height.to_string().as_bytes()).unwrap();
It saves the last indexed block height into a file last_indexed_block
right near the indexer binary.
In the real world indexer you'd probably go with some other storage, depending on the toolset you're using.
But to show you the concept, we've decided to go with the easiest approach by saving it to the file.
Now we need to implement the reading the value from the file.
If it is a first start of your indexer and you ask it to start from interruption it wouldn't be able to find last_indexed_block
and would just fail.
It's not the behavior we expect. That's why we assume you want it to start from interruption (if possible) or from the latest.
Let's finish up our get_start_block_height
async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.start_options() {
StartOptions::FromBlock { height } => *height,
StartOptions::FromLatest => final_block_height(opts.rpc_url()).await,
// a placeholder
StartOptions::FromInterruption => {
match &std::fs::read("last_indexed_block") {
Ok(contents) => {
String::from_utf8_lossy(contents).parse().unwrap()
}
Err(e) => {
eprintln!("Cannot read last_indexed_block.\n{}\nStart indexer from latest final", e);
latest_block_height(opts.rpc_url()).await
}
}
},
}
}
What we are doing here:
last_indexed_block
파일을 읽으려고 합니다.Result
가Ok
인 경우,contents
를 읽고 파싱합니다.Result
가Err
인 경우, 오류에 대한 메시지를 출력하고last_block_height
를 호출하여 네 트워크에서 최종 블록을 가져옵니다(이전에 이야기한 fallback).
FromInterruption
테스트
In order to ensure everything works as expected we will start index from the genesis to store the last indexed block. And then we will start it from interruption to ensure we're not starting from latest.
Let's build and run from genesis.
To start NEAR Lake Framework based indexer from the genesis block, you need to just specify the start_block_height
as 0
.
cargo build --release
./target/release/indexer mainnet from-block 0
You will see something like:
9820210 / shards 1
9820214 / shards 1
9820216 / shards 1
9820219 / shards 1
9820221 / shards 1
9820226 / shards 1
9820228 / shards 1
9820230 / shards 1
9820231 / shards 1
9820232 / shards 1
9820233 / shards 1
9820235 / shards 1
9820236 / shards 1
9820237 / shards 1
9820238 / shards 1
Stop it by pressing CTRL+C
Memorize the last block height you see. In our example it is 9820238
Restart the indexer from interruption
./target/release/indexer mainnet from-interruption
You should see the indexer logs beginning from the block you've memorized.
Perfect! It's all done. Now you can adjust the code you got in the result to your needs and use it in your indexers.
요약
You've seen the way how you can empower your indexer with the starting options. As you can see there is nothing complex here.
You can find the source code in the near-examples/lake-indexer-start-options