Microservices in Rust with GraphQL and Cap’n Proto

Jorge Luna
Level Up Coding
Published in
7 min readDec 31, 2020

--

Overview

I found this topic interesting because I’m currently working on a personal IoT project and I’m always looking for technologies that offer the best performance. Rust has attracted my attention since it is has gained a lot of popularity in the last years. In my point of view we can see it as an improved version of C++ since it has better memory management and safety. In addition, this is an imperative programming language and is not object oriented like Java, Python, Ruby, etc. We can also compare it to Go since it also has a functional style, but I prefer Rust because of its safety and performance.

I’m also a Go programmer and I think it’s very good when it comes to concurrency, definitely the Go creators did an awesome work on its design and engineering behind. Thanks to that fact, we can deal with concurrency using go routines and channels very easily, in comparison to other languages. Maybe this is a downside for Rust, because it’s difficult to work with threads using any asynchronous crate like tokio or async-std you can check an echo server example to have a clear understanding of what I’m trying to say https://github.com/seguidor777/tls_echo

On the other hand I chose GraphQL because it just catched my attention and I prefer it over REST json. Let me share you a brief story:

Facebook created GraphQL in 2015 and has been gaining traction by the mainstream developers ever since. GraphQL not only reduces the noise but allows for greater interface by the consumers to use the application. It has been deprecating REST since GraphQL reduces all of its complexity.

At last but not least we have Cap’n proto for the inter-service communication between the microservices. Think of Capnp like a RPC framework very similar to gRPC but much faster, because it doesn’t need to encode/decode the messages. I’m not going into further details, but you may take a look at its webpage https://capnproto.org/.

Assumptions

I’ll assume that you already know what is the microservices architecture and we won’t need to go into details.

Computer setup

Creating the microservices

This demo application is just for interacting with a mocked starwars database and is composed of two microservices:

Metadata: Performs queries to find or create a human in the database.

API Gateway: Dispatches the external GraphQL requests and redirect them to the metadata service.

Note: We don’t have any external dependencies. But if you want, you could give it a try to Cockroach and the sqlx crate.

Metadata service

We are ready to start coding the Metadata service. Let’s create a new project with the command cargo new metadata

Our Cargo.toml file should look as follows (versions may vary):

[package]
name = "metadata"
version = "0.1.0"
authors = ["Jorge Luna <jorge.luna@digitalonus.com>"]
edition = "2018"
[dependencies]
capnp = "0.14.0"
capnp-rpc = "0.14.0"
futures = "0.3.0"
async-std = { version = "1.8.0", features = ["unstable"] }
[build-dependencies]
capnpc = "0.14.1"

Then we need to define the Capnp schema and the build script that generates a new Rust module.

capnp/starwars.capnp

@0xc1da187e3c0d97cd;

interface StarWars {
struct Human {
id @0 :Text;
name @1 :Text;
homePlanet @2 :Text;
appearsIn @3 :AppearsIn;

enum AppearsIn {
newHope @0;
empire @1;
jedi @2;
}
}

showHuman @0 (id: Text) -> Human;
createHuman @1 Human -> Human;
}

build.rs

fn main() {
::capnpc::CompilerCommand::new()
.src_prefix("capnp")
.file("capnp/starwars.capnp")
.run().expect("failed to compile schema");
}

Then we need to edit the main.rs file as follows.

mod rpc;
pub mod starwars_capnp {
include!(concat!(env!("OUT_DIR"), "/starwars_capnp.rs"));
}

fn main() {
rpc::server::run().expect("cannot run server")
}

Also create a new module for the RPC server, we need these files:

src/rpc/mod.rs

pub mod server;

src/rpc/server.rs

use crate::starwars_capnp;
use capnp::capability::Promise;
use capnp_rpc::{pry, RpcSystem};
use capnp_rpc::twoparty::{VatNetwork};
use capnp_rpc::rpc_twoparty_capnp::{Side};
use futures::{AsyncReadExt, FutureExt};
use futures::task::LocalSpawnExt;

struct StarWars;

impl starwars_capnp::star_wars::Server for StarWars {
fn show_human(&mut self, params: starwars_capnp::star_wars::ShowHumanParams, mut results: starwars_capnp::star_wars::ShowHumanResults)
-> Promise<(), capnp::Error> {
// get a reader object for the sent request
let request_reader = pry!(params.get());
// get the send ID
let _id = request_reader.get_id();

// set return values
results.get().set_name("Luke");
results.get().set_appears_in(starwars_capnp::star_wars::human::AppearsIn::NewHope);
results.get().set_home_planet("Mars");
Promise::ok(())
}

fn create_human(&mut self, params: starwars_capnp::star_wars::CreateHumanParams, mut results: starwars_capnp::star_wars::CreateHumanResults)
-> Promise<(), capnp::Error> {
// get a reader object for the sent request
let request_reader = pry!(params.get());

// set return values
results.get().set_id("1234");
results.get().set_name(request_reader.get_name().unwrap());
results.get().set_appears_in(request_reader.get_appears_in().unwrap());
results.get().set_home_planet(request_reader.get_home_planet().unwrap());
Promise::ok(())
}
}

pub fn run() -> Result<(), Box<dyn std::error::Error>> {
let addr = "127.0.0.1:8001";
let mut exec = futures::executor::LocalPool::new();
let spawner = exec.spawner();

exec.run_until(async move {
let listener = async_std::net::TcpListener::bind(&addr).await.unwrap();

println!("Metadata: tcp://{}", addr);

let client: starwars_capnp::star_wars::Client = capnp_rpc::new_client(StarWars);

loop {
let (stream, _) = listener.accept().await.unwrap();

stream.set_nodelay(true).unwrap();

let (reader, writer) = stream.split();
let network = VatNetwork::new(
reader,
writer,
Side::Server,
Default::default(),
);

let rpc_system =
RpcSystem::new(Box::new(network), Some(client.clone().client));

spawner.spawn_local(Box::pin(rpc_system.map(|_|())))?;
}
})
}

We just defined the RPC methods that are going to be called for the inter-service communication.

Once you have all the files, install the dependencies and run the project with cargo run

API Gateway service

Let’s create the API gateway service by creating another project with cargo new api-gateway

For this project we are going to reuse the same capnp/starwars.capnp and build.rs files, since we actually sharing the same RPC schema.

Add this dependencies to the Cargo.toml file:

... Omitted for brevity

[dependencies]
actix-web = "3"
actix-cors = "0.4.0"
actix-rt = "1.1.0"
juniper_warp = "0.6.0"
serde = "1.0.103"
serde_json = "1.0.44"
serde_derive = "1.0.103"
juniper = "0.14.2"
capnp = "0.14.0"
capnp-rpc = "0.14.0"
futures = "0.3.0"
async-std = { version = "1.8.0", features = ["unstable"] }

[build-dependencies]
capnpc = "0.14.1"

And edit the main.rs file as follows:

mod graphql;
mod rpc;
pub mod starwars_capnp {
include!(concat!(env!("OUT_DIR"), "/starwars_capnp.rs"));
}

use actix_cors::Cors;
use actix_web::{guard, middleware, web, App, Error, HttpResponse, HttpServer};
use juniper::http::graphiql::graphiql_source;
use juniper::http::GraphQLRequest;
use graphql::schema::{create_schema, Schema};
use std::io;
use std::sync::Arc;

async fn graphiql() -> HttpResponse {
let html = graphiql_source("http://127.0.0.1:8000/");
HttpResponse::Ok()
.content_type("text/html; charset=utf-8")
.body(html)
}

async fn graphql(
st: web::Data<Arc<Schema>>,
data: web::Json<GraphQLRequest>,
) -> Result<HttpResponse, Error> {
let user = web::block(move || {
let res = data.execute(&st, &());
Ok::<_, serde_json::error::Error>(serde_json::to_string(&res)?)
})
.await?;
Ok(HttpResponse::Ok()
.content_type("application/json")
.body(user))
}

#[actix_web::main]
async fn main() -> io::Result<()> {
// Create Juniper schema
let schema = std::sync::Arc::new(create_schema());
let addr = "127.0.0.1:8000";

println!("API Gateway: http://{}", addr);

// Start http server
HttpServer::new(move || {
App::new()
.data(schema.clone())
.wrap(middleware::Logger::default())
.wrap(
Cors::new()
.allowed_methods(vec!["POST", "GET"])
.supports_credentials()
.max_age(3600)
.finish(),
)
.service(web::resource("/").guard(guard::Post()).to(graphql))
.service(web::resource("/").guard(guard::Get()).to(graphiql))
}).bind(addr)?
.run()
.await
}

Basically we are running the GraphQL server and registering two routes for dispatching the requests from the playground.

For the schema, lets create these files:

src/graphql/mod.rs

pub mod schema;

src/graphql/schema.rs

use crate::rpc;
use juniper::{FieldResult, RootNode};
use juniper::{GraphQLEnum, GraphQLInputObject, GraphQLObject};

#[derive(GraphQLEnum)]
pub enum Episode {
NewHope,
Empire,
Jedi,
}

#[derive(GraphQLObject)]
#[graphql(description = "A human of any type")]
pub struct Human {
pub id: String,
pub name: String,
pub appears_in: Episode,
pub home_planet: String,
}

#[derive(GraphQLInputObject)]
#[graphql(description = "A new human of any type")]
pub struct NewHuman {
pub name: String,
pub appears_in: Episode,
pub home_planet: String,
}

pub struct QueryRoot;

#[juniper::object]
impl QueryRoot {
fn show_human(id: String) -> FieldResult<Human> {
rpc::client::show_human(id)
}
}

pub struct MutationRoot;

#[juniper::object]
impl MutationRoot {
fn create_human(new_human: NewHuman) -> FieldResult<Human> {
rpc::client::create_human(new_human)
}
}

pub type Schema = RootNode<'static, QueryRoot, MutationRoot>;

pub fn create_schema() -> Schema {
Schema::new(QueryRoot {}, MutationRoot {})
}

Here we are defining the structures for the json payloads and the implementations for the methods to query and mutate data.

Let’s define the RPC client for redirecting the calls to the server.

This module consists of these files:

src/rpc/mod.rs

pub mod client;

src/rpc/client.rs

use crate::graphql::schema::{Episode, Human, NewHuman};
use crate::starwars_capnp;
use capnp_rpc::RpcSystem;
use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::rpc_twoparty_capnp::Side;
use futures::{AsyncReadExt, FutureExt};
use futures::task::{LocalSpawnExt};
use juniper::FieldResult;

pub fn show_human(id: String) -> FieldResult<Human> {
let mut exec = futures::executor::LocalPool::new();
let spawner = exec.spawner();

exec.run_until(async move {
let mut rpc_system: RpcSystem<Side> = get_rpc_system().await?;
let starwars_client: starwars_capnp::star_wars::Client = rpc_system.bootstrap(Side::Server);

spawner.spawn_local(Box::pin(rpc_system.map(|_|())))?;

// Create get_result request object
let mut request = starwars_client.show_human_request();

// Set Human ID
request.get().set_id(&id);

// Send request, and await response
let response = request.send().promise.await?;

Ok(Human {
id,
name: response.get().unwrap().get_name().unwrap().to_string(),
appears_in: appears_in_from_capnp(response.get().unwrap().get_appears_in().unwrap()),
home_planet: response.get().unwrap().get_home_planet().unwrap().to_string(),
})
})
}

pub fn create_human(new_human: NewHuman) -> FieldResult<Human> {
let mut exec = futures::executor::LocalPool::new();
let spawner = exec.spawner();

exec.run_until(async move {
let mut rpc_system: RpcSystem<Side> = get_rpc_system().await?;
let starwars_client: starwars_capnp::star_wars::Client = rpc_system.bootstrap(Side::Server);

spawner.spawn_local(Box::pin(rpc_system.map(|_|())))?;

// Create get_result request object
let mut request = starwars_client.create_human_request();

// Set Human fields
request.get().set_name(&new_human.name);
request.get().set_home_planet(&new_human.home_planet);
request.get().set_appears_in(appears_in_to_capnp(new_human.appears_in));

// Send request, and await response
let response = request.send().promise.await?;

Ok(Human {
id: response.get().unwrap().get_id().unwrap().to_string(),
name: response.get().unwrap().get_name().unwrap().to_string(),
appears_in: appears_in_from_capnp(response.get().unwrap().get_appears_in().unwrap()),
home_planet: response.get().unwrap().get_home_planet().unwrap().to_string(),
})
})
}

fn appears_in_from_capnp(appears_in: starwars_capnp::star_wars::human::AppearsIn) -> Episode {
match appears_in {
starwars_capnp::star_wars::human::AppearsIn::NewHope => Episode::NewHope,
starwars_capnp::star_wars::human::AppearsIn::Empire => Episode::Empire,
starwars_capnp::star_wars::human::AppearsIn::Jedi => Episode::Jedi,
}
}

fn appears_in_to_capnp(appears_in: Episode) -> starwars_capnp::star_wars::human::AppearsIn {
match appears_in {
Episode::NewHope => starwars_capnp::star_wars::human::AppearsIn::NewHope,
Episode::Empire => starwars_capnp::star_wars::human::AppearsIn::Empire,
Episode::Jedi => starwars_capnp::star_wars::human::AppearsIn::Jedi,
}
}

async fn get_rpc_system() -> Result<RpcSystem<Side>, Box<dyn std::error::Error>> {
let stream = async_std::net::TcpStream::connect("127.0.0.1:8001").await?;

stream.set_nodelay(true)?;

let (reader, writer) = stream.split();
let network = Box::new(
VatNetwork::new(reader, writer, Side::Client, Default::default())
);

Ok(RpcSystem::new(network, None))
}

Here we establish the connection with the RPC server and perform the calls with the same data coming from the GraphQL requests.

We need to translate the Enum types to be compatible with the ones defined in the Capnp schema.

Finally run this project in another shell with the command cargo run and open the URL that is shown.

Testing the API

Visit the URL http://127.0.0.1:8000 from the API Gateway service to access the GraphQL playground.

Run the next query to show human data:

{
showHuman(id: "1235") {
id
name
appearsIn
homePlanet
}
}

Run this query with some data to create a new human and see the result.

(Try changing the values)

mutation {
createHuman(newHuman: {
name: "Luke",
appearsIn: NEW_HOPE,
homePlanet: "Mars",
}) {
id
name
appearsIn
homePlanet
}
}

Also you can load test the requests with any tool of your choice, I recommend bombardier or siege.

I hope you enjoyed this tutorial, as a software architect my goal was to share with others my introduction to the world of microservices in Rust. If this post was helpful to you, please leave some claps and comments below.

--

--