본문으로 건너뛰기

시작 옵션으로 Lake 인덱서 확장

This tutorial ends with the example code of the simple indexer built on top of NEAR Lake Framework that can start:

  • 지정된 블록 높이에서(기본적으로)
    ./target/release/indexer mainnet from-block 65359506
  • 네트워크의 최신 최종 블록에서
    ./target/release/indexer mainnet from-latest
  • 중단되기 전에 마지막으로 인덱싱한 블록에서
    ./target/release/indexer mainnet from-interruption

목표

To find out whether you need an indexer for you project and to create one means you're covering only one side of things - the development.

There is another important side - the maintenance. This involves:

  • 인덱서를 최신 버전의 의존성(dependency)으로 업그레이드해야 함
  • 인덱서를 사용자가 만든 새 기능으로 업데이트해야 함
  • 사용하는 서버에 약간의 유지 관리가 필요함
  • 사건이 발생
  • etc.

Almost in all of the above cases you might want to start or restart your indexer not only from the specific block you need to provide, but from the block it was stopped, or from the latest final block in the network.

NEAR Lake Framework doesn't provide such options. Actually, we didn't empower the library with these options to start indexer intentionally.

Intent

We want to keep NEAR Lake Framework crate in the narrowest possible way. The goal for the library is to do a single job and allow it to be empowered with any features but outside of the crate itself

Though, the possibility to start indexer from the latest block or from the block after the one it has indexed the last, might be very useful.

Also, during the April Data Platform Community Meeting we had a question whether we plan to add this feature to the library. We've promised to create a tutorial showing how to do it by your own. So here it is.

준비

In this tutorial we're not going to focus our attention on the indexer itself, but on the start options instead.

노트

To simplify the code samples in the tutorial, we're writing entire application in a single file src/main.rs.

Please, do not take it as a design advice. We do it only for the simplicity

Let's prepare a project with a base dependencies, so we can focus on the main goal of this tutorial.

Create a new Rust project

cargo new --bin indexer && cd indexer

Replace the content of the Cargo.toml file with this:

[package]
name = "indexer"
version = "0.1.0"
edition = "2021"
rust-version = "1.60.0"

[dependencies]
clap = { version = "3.1.6", features = ["derive"] }
futures = "0.3.5"
itertools = "0.9.0"
tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
tokio-stream = { version = "0.1" }
tracing = "0.1.13"
tracing-subscriber = "0.2.4"
serde = { version = "1", features = ["derive"] }
serde_json = "1.0.55"

near-lake-framework = "0.3.0"

Replace the content of src/main.rs with this:

use clap::{Parser, Subcommand};
use futures::StreamExt;
use tracing_subscriber::EnvFilter;

// TODO: StartOptions

#[tokio::main]
async fn main() -> Result<(), tokio::io::Error> {
init_tracing();

let opts = Opts::parse();

// TODO: Config

let stream = near_lake_framework::streamer(config);

let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
.map(handle_streamer_message)
.buffer_unordered(1usize);

while let Some(_handle_message) = handlers.next().await {}

Ok(())
}

async fn handle_streamer_message(
streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage,
) {
eprintln!(
"{} / shards {}",
streamer_message.block.header.height,
streamer_message.shards.len()
);
std::fs::write("last_indexed_block", streamer_message.block.header.height.to_string().as_bytes()).unwrap();
}

fn init_tracing() {
let mut env_filter = EnvFilter::new("near_lake_framework=info");

if let Ok(rust_log) = std::env::var("RUST_LOG") {
if !rust_log.is_empty() {
for directive in rust_log.split(',').filter_map(|s| match s.parse() {
Ok(directive) => Some(directive),
Err(err) => {
eprintln!("Ignoring directive `{}`: {}", s, err);
None
}
}) {
env_filter = env_filter.add_directive(directive);
}
}
}

tracing_subscriber::fmt::Subscriber::builder()
.with_env_filter(env_filter)
.with_writer(std::io::stderr)
.init();
}

This code is not going to build yet. Meanwhile let's have a quick look of what we've copy/pasted for now:

  • 수락할 명령줄 인자를 설정하기 위해 clap을 가져왔습니다.
  • 또한 futurestracing_subscriber 같은 중요한 필수 항목이 있습니다.
  • 파일 끝의 init_tracing는 애플리케이션을 near-lake-framework의 로그에 등록하는 헬퍼 함수입니다.
  • 인덱서 상용구 코드가 있는 비동기 main 함수는 LakeConfig 생성 파트가 없습니다.
  • 이 튜토리얼 코드를 작성할 위치를 찾을 수 있도록 표시된 몇 가지 // TODO: ... 섹션을 찾을 수 있습니다.

OK, all the preparations are done. Let's move on.

StartOptions 디자인

So we want to be able to pass a command that defines the way our indexer should start. In this tutorial we'll be using clap.

We need a structure that receives the chain id. This will allow us to use command:

./target/release/indexer mainnet ...

OR

./target/release/indexer testnet ...

Let's replace the // TODO: StartOptions in the src/main.rs with:

#[derive(Parser, Debug, Clone)]
#[clap(version = "0.1", author = "Near Inc. <hello@nearprotocol.com>")]
struct Opts {
#[clap(subcommand)]
pub chain_id: ChainId,
}

#[derive(Subcommand, Debug, Clone)]
enum ChainId {
#[clap(subcommand)]
Mainnet(StartOptions),
#[clap(subcommand)]
Testnet(StartOptions),
}

Now we want to create a StartOptions structure that will allow us to tell our indexer where to start indexing from. The command should look like:

./target/release mainnet from-latest

Our variants are:

  • from-block N (N은 시작할 블록 높이)
  • from-latest (네트워크의 최신 최종 블록에서 시작)
  • from-interruption (이전에 중단된 블록 인덱서에서 시작)

Let's replace the comment // TODO: StartOptions with the enum:

#[derive(Subcommand, Debug, Clone)]
pub(crate) enum StartOptions {
FromBlock { height: u64 },
FromLatest,
FromInterruption,
}

Pretty simple and straightforward, agree?

LakeConfig 생성

In order to create LakeConfig we're going to use a config builder LakeConfigBuilder. Fotunately, we've imported it already.

Let's instantiate a builder in place of // TODO: Config comment:

    let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();

Notice that lake_config_builder is defined as mutable.

Now we need to set the chain we are going to index by matching ChainId provided:

    let mut lake_config_builder = near_lake_framework::LakeConfigBuilder::default();

match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet();
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet();
}
}

As you can see, depending on the variant of the ChainId enum we modify the lake_config_builder with one of the shortcuts mainnet() or testnet().

The only parameter left to set is the most important for us in this tutorial start_block_height

Normally, we just pass the block height number u64 but we're implementing the start options here.

시작 옵션 로직

Let's create a separate function that will hold the logic of identification the start_block_height and call it get_start_block_height.

Just read the code, don't copy, it's not final approach yet

FromBlock { height: u64 }

Let's start from implementation from-block N as the simplest one:

async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,

}
}

OK, it's simple enough, what's about other match arms for StartOptions:

async fn get_start_block_height(start_options: &StartOptions) -> u64 {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}

Er, how should we get the latest block from the network? We should query the JSON RPC and get the final block, extract its height and call it a day.

FromLatest

In order to query the JSON RPC from within Rust code we need to use near-jsonrpc-client-rs crate

You can find a bunch of useful examples in the corresponding folder of the project's repository on GitHub.

Add it to Cargo.toml in the end:

near-jsonrpc-client = "0.3.0"

The code for getting the final block height would look like the following:

use near_jsonrpc_client::{methods, JsonRpcClient};
use near_lake_framework::near_indexer_primitives::types::{BlockReference, Finality};

async fn final_block_height() -> u64 {
let client = JsonRpcClient::connect("https://rpc.mainnet.near.org");
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};

let latest_block = client.call(request).await.unwrap();

latest_block.header.height
}

Nice and easy. Though, a hardcoded value of "https://rpc.mainnet.near.org" looks not so great. Especially when we want to support both networks.

But we can handle it by passing the JSON RPC URL to the get_start_block_function like this:


async fn get_start_block_height(
start_options: &StartOptions,
rpc_url: &str,
) -> u64 {
...
}

...
match &opts.chain_id {
ChainId::Mainnet(start_options) => {
lake_config_builder = lake_config_builder
.mainnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.mainnet.near.org",
).await
);
}
ChainId::Testnet(start_options) => {
lake_config_builder = lake_config_builder
.testnet()
.start_block_height(
get_start_block_height(
start_options,
"https://rpc.testnet.near.org",
).await
)
}
}

Meh. It's ugly and why should we pass it everytime if it is required in only one case from three possible?

Instead we can pass to the get_start_block_height function the entire Opts.

async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.chain_id {
ChainId::Mainnet(start_options) => {
match start_options {
StartOptions::FromBlock { height } => height,
StartOptions::FromLatest =>
}
}
}
}

At least we have everything we need. Though, it still looks ugly and will definitely involve code duplication.

What we propose instead to is create impl Opts with a few useful methods to get JSON RPC URL and to get StartOptions instance.

Now you may proceed copying the code safely

Somewhere under the StartOptions definition add the following:

impl Opts {
pub fn rpc_url(&self) -> &str {
match self.chain_id {
ChainId::Mainnet(_) => "https://rpc.mainnet.near.org",
ChainId::Testnet(_) => "https://rpc.testnet.near.org",
}
}

pub fn start_options(&self) -> &StartOptions {
match &self.chain_id {
ChainId::Mainnet(args) | ChainId::Testnet(args) => args
}
}
}

And now we can create our get_start_block_height function with the helper function that will query the final block final_block_height (we're going to reuse it, watch for the hands):

async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.start_options() {
StartOptions::FromBlock { height } => *height,
StartOptions::FromLatest => final_block_height(opts.rpc_url()).await,
// a placeholder
StartOptions::FromInterruption => 0,
}
}

async fn final_block_height(rpc_url: &str) -> u64 {
let client = JsonRpcClient::connect(rpc_url);
let request = methods::block::RpcBlockRequest {
block_reference: BlockReference::Finality(Finality::Final),
};

let latest_block = client.call(request).await.unwrap();

latest_block.header.height
}

You may have noticed the FromInterruption and a comment about the placeholder. The reason we've made is to be able to build the application right now to test out that FromLatest works as expected.

FromLatest 테스트

Credentials

Please, ensure you've the credentials set up as described on the Credentials page. Otherwise you won't be able to get the code working.

Let's try to build and run our code

cargo build --release

./target/release/indexer mainnet from-latest

Once the code is built you should see something like that in your terminal:

65364116 / shards 4
65364117 / shards 4
65364118 / shards 4
65364119 / shards 4
65364120 / shards 4

You can stop it by pressing CTRL+C

And now we can move on to FromInterruption

FromInterruption

In order to let an indexer know at what block it was interrupted, the indexer needs to store the block height somewhere. And it should do it in the and of the handle_message function.

In the boilerplate code you've copy/pasted in the beginning of this tutorial you can notice a line of code:

    std::fs::write("last_indexed_block", streamer_message.block.header.height.to_string().as_bytes()).unwrap();

It saves the last indexed block height into a file last_indexed_block right near the indexer binary.

In the real world indexer you'd probably go with some other storage, depending on the toolset you're using.

But to show you the concept, we've decided to go with the easiest approach by saving it to the file.

Now we need to implement the reading the value from the file.

노트

If it is a first start of your indexer and you ask it to start from interruption it wouldn't be able to find last_indexed_block and would just fail.

It's not the behavior we expect. That's why we assume you want it to start from interruption (if possible) or from the latest.

Let's finish up our get_start_block_height

async fn get_start_block_height(opts: &Opts) -> u64 {
match opts.start_options() {
StartOptions::FromBlock { height } => *height,
StartOptions::FromLatest => final_block_height(opts.rpc_url()).await,
// a placeholder
StartOptions::FromInterruption => {
match &std::fs::read("last_indexed_block") {
Ok(contents) => {
String::from_utf8_lossy(contents).parse().unwrap()
}
Err(e) => {
eprintln!("Cannot read last_indexed_block.\n{}\nStart indexer from latest final", e);
latest_block_height(opts.rpc_url()).await
}
}
},
}
}

What we are doing here:

  • last_indexed_block 파일을 읽으려고 합니다.
  • ResultOk인 경우, contents를 읽고 파싱합니다.
  • ResultErr인 경우, 오류에 대한 메시지를 출력하고 last_block_height를 호출하여 네트워크에서 최종 블록을 가져옵니다(이전에 이야기한 fallback).

FromInterruption 테스트

In order to ensure everything works as expected we will start index from the genesis to store the last indexed block. And then we will start it from interruption to ensure we're not starting from latest.

Let's build and run from genesis.

Genesis Trick

To start NEAR Lake Framework based indexer from the genesis block, you need to just specify the start_block_height as 0.

cargo build --release
./target/release/indexer mainnet from-block 0

You will see something like:

9820210 / shards 1
9820214 / shards 1
9820216 / shards 1
9820219 / shards 1
9820221 / shards 1
9820226 / shards 1
9820228 / shards 1
9820230 / shards 1
9820231 / shards 1
9820232 / shards 1
9820233 / shards 1
9820235 / shards 1
9820236 / shards 1
9820237 / shards 1
9820238 / shards 1

Stop it by pressing CTRL+C

Memorize the last block height you see. In our example it is 9820238

Restart the indexer from interruption

./target/release/indexer mainnet from-interruption

You should see the indexer logs beginning from the block you've memorized.

Perfect! It's all done. Now you can adjust the code you got in the result to your needs and use it in your indexers.

요약

You've seen the way how you can empower your indexer with the starting options. As you can see there is nothing complex here.

You can find the source code in the near-examples/lake-indexer-start-options

Was this page helpful?