시작 옵션으로 Lake 인덱서 확장
끝
This tutorial ends with the example code of the simple indexer built on top of NEAR Lake Framework that can start:
- 지정된 블록 높이에서(기본적으로)
./target/release/indexer mainnet from-block 65359506
- 네트워크의 최신 최종 블록에서
./target/release/indexer mainnet from-latest
- 중단되기 전에 마지막으로 인덱싱한 블록에서
./target/release/indexer mainnet from-interruption
목표
To find out whether you need an indexer for you project and to create one means you're covering only one side of things - the development.
There is another important side - the maintenance. This involves:
- 인덱서를 최신 버전의 의존성(dependency)으로 업그레이드해야 함
- 인덱서를 사용자가 만든 새 기능으로 업데이트해야 함
- 사용하는 서버에 약간의 유지 관리가 필요함
- 사건이 발생
- etc.
Almost in all of the above cases you might want to start or restart your indexer not only from the specific block you need to provide, but from the block it was stopped, or from the latest final block in the network.
NEAR Lake Framework doesn't provide such options. Actually, we didn't empower the library with these options to start indexer intentionally.
We want to keep NEAR Lake Framework crate in the narrowest possible way. The goal for the library is to do a single job and allow it to be empowered with any features but outside of the crate itself
Though, the possibility to start indexer from the latest block or from the block after the one it has indexed the last, might be very useful.
Also, during the April Data Platform Community Meeting we had a question whether we plan to add this feature to the library. We've promised to create a tutorial showing how to do it by your own. So here it is.
준비
In this tutorial we're not going to focus our attention on the indexer itself, but on the start options instead.
To simplify the code samples in the tutorial, we're writing entire application in a single file src/main.rs
.
Please, do not take it as a design advice. We do it only for the simplicity
Let's prepare a project with a base dependencies, so we can focus on the main goal of this tutorial.
Create a new Rust project
cargo new --bin indexer && cd indexer
Replace the content of the Cargo.toml
file with this:
[package]
name = "indexer"
version = "0.1.0"
edition = "2021"
rust-version = "1.60.0"
[dependencies]
clap = { version = "3.1.6", features = ["derive"] }
futures = "0.3.5"
itertools = "0.9.0"
tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1" }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"
near-lake-framework = "0.3.0"
Replace the content of src/main.rs
with this:
use clap::{Parser, Subcommand};
use futures::StreamExt;
use tracing_subscriber::EnvFilter;
// TODO: StartOptions
#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_tracing();
let opts = Opts::parse();
// TODO: Config
let stream = near_lake_framework::streamer(config);
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(handle_streamer_message)
.buffer_unordered(1usize);
while let Some(_handle_message) = handlers.next().await {}
Ok(())
}
async fn handle_streamer_message(
streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage,
) {
eprintln!(
"{} / shards {}",
streamer_message.block.header.height,
streamer_message.shards.len()
);
std::fs::write("last_indexed_block", streamer_message.block.header.height.to_string().as_bytes()).unwrap();
}
fn init_tracing() {
let mut env_filter = EnvFilter::new("near_lake_framework=info");
if let Ok(rust_log) = std::env::var("RUST_LOG") {
if !rust_log.is_empty() {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
eprintln!("Ignoring directive `{}`: {}", s, err);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
}
tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}
This code is not going to build yet. Meanwhile let's have a quick look of what we've copy/pasted for now:
- 수락할 명령줄 인자를 설정하기 위해
clap
을 가져왔습니다. - 또한
futures
와tracing_subscriber
같은 중요한 필수 항목이 있습니다. - 파일 끝의
init_tracing
는 애플리케이션을near-lake-framework
의 로그에 등록하는 헬퍼 함수입니다. - 인덱서 상용구 코드가 있는 비동기
main
함수는LakeConfig
생성 파트가 없습니다. - 이 튜토리얼 코드를 작성할 위치를 찾을 수 있도록 표시된 몇 가지
// TODO: ...
섹션을 찾을 수 있습니다.
OK, all the preparations are done. Let's move on.
StartOptions
디자인
So we want to be able to pass a command that defines the way our indexer should start. In this tutorial we'll be using clap
.
We need a structure that receives the chain id. This will allow us to use command:
./target/release/indexer mainnet ...
OR
./target/release/indexer testnet ...
Let's replace the // TODO: StartOptions
in the src/main.rs
with:
#[derive(Parser, Debug, Clone)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
struct Opts {
#[clap(subcommand)]
pub chain_id: ChainId,
}
#[derive(Subcommand, Debug, Clone)]
enum ChainId {
#[clap(subcommand)]
Mainnet(StartOptions),
#[clap(subcommand)]
Testnet(StartOptions),
}
Now we want to create a StartOptions
structure that will allow us to tell our indexer where to start indexing from. The command should look like:
./target/release mainnet from-latest
Our variants are:
from-block N
(N
은 시작할 블록 높이)from-latest
(네트워크의 최신 최종 블록에서 시작)from-interruption
(이전에 중단된 블록 인덱서에서 시작)
Let's replace the comment // TODO: StartOptions
with the enum:
#[derive(Subcommand, Debug, Clone)]
pub(crate) enum StartOptions {
FromBlock { height: u64 },
FromLatest,
FromInterruption,
}
Pretty simple and straightforward, agree?
LakeConfig
생성
In order to create LakeConfig
we're going to use a config builder LakeConfigBuilder
. Fotunately, we've imported it already.
Let's instantiate a builder in place of // TODO: Config
comment:
let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();
Notice that lake_config_builder
is defined as mutable.
Now we need to set the chain we are going to index by matching ChainId
provided:
let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();
match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet();
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet();
}
}
As you can see, depending on the variant of the ChainId
enum we modify the lake_config_builder
with one of the shortcuts mainnet()
or testnet()
.
The only parameter left to set is the most important for us in this tutorial start_block_height
Normally, we just pass the block height number u64
but we're implementing the start options here.
시작 옵션 로직
Let's create a separate function that will hold the logic of identification the start_block_height
and call it get_start_block_height
.
Just read the code, don't copy, it's not final approach yet
FromBlock { height: u64 }
Let's start from implementation from-block N
as the simplest one:
async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,
}
}
OK, it's simple enough, what's about other match arms for StartOptions
:
async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}
Er, how should we get the latest block from the network? We should query the JSON RPC and get the final block, extract its height and call it a day.
FromLatest
In order to query the JSON RPC from within Rust code we need to use near-jsonrpc-client-rs
crate
You can find a bunch of useful examples in the corresponding folder of the project's repository on GitHub.
Add it to Cargo.toml
in the end:
near-jsonrpc-client = "0.3.0"
The code for getting the final block height would look like the following:
use near_jsonrpc_client::{methods, JsonRpcClient};
use near_lake_framework::near_indexer_primitives::types::{BlockReference, Finality};
async fn final_block_height() -> u64 {
let client = JsonRpcClient::connect("https://rpc.mainnet.near.org");
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};
let latest_block = client.call(request).await.unwrap();
latest_block.header.height
}
Nice and easy. Though, a hardcoded value of "https://rpc.mainnet.near.org"
looks not so great. Especially when we want to support both networks.
But we can handle it by passing the JSON RPC URL to the get_start_block_function
like this:
async fn get_start_block_height(
start_options: &StartOptions,
rpc_url: &str,
) -> u64 {
...
}
...
match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.mainnet.near.org",
).await
);
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.testnet.near.org",
).await
)
}
}
Meh. It's ugly and why should we pass it everytime if it is required in only one case from three possible?
Instead we can pass to the get_start_block_height
function the entire Opts
.
async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.chain_id {
ChainId::Mainnet(start_options) => {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}
}
}