GoとgRPCでKVS的なものを作ってみた

正月で時間があったので、以前から触ってみたかったgRPCをGo言語から使い、キー・バリュー・ストアのようなものを作ってみた。

KVSといっても、GoのmapへのGet/Put/Delete/ScanをgRPC経由で叩けるようにしただけのもの。それだけだとあまり面白く無いので、gRPCらしく、Watch機能をつけてmapへの更新を監視できるようにした。

github.com

f:id:ono_matope:20160105002700g:plain

個人的には、HTTP/1.1 + JSON APIと比べた時のgRPC(HTTP/2 + ProtoBuf)のメリットや違いが気になっていたので、そのあたりを気をつけながら書いた。

開発の手順

サービス定義

まずはProtocol Buffers 3でKVSのサービスを定義する。サンプルを見ながら適当に書いた。

grpc-kvs/grpc-kvs.proto at master · matope/grpc-kvs · GitHub

syntax = "proto3";

package proto;

service Kvs {
  rpc Get(GetRequest) returns (GetResponse) {}
  rpc Put(PutRequest) returns (PutResponse) {}
  rpc Delete(DeleteRequest) returns (DeleteResponse) {}
  rpc Range(RangeRequest) returns (stream Entry) {}
  rpc Watch(WatchRequest) returns (stream Entry) {}
}

message Entry {
  string key = 1;
  string value = 2;
}

message GetRequest { string key = 1; }
message GetResponse { string value = 1; }

message PutRequest { string key = 1; string value = 2; }
message PutResponse {}

message DeleteRequest { string key = 1; }
message DeleteResponse {}

message RangeRequest { string startKey = 1; int32 maxKeys = 2; }

message WatchRequest { string prefix = 1; }

メソッドの引数と戻り値はmessageで定義した構造体である必要があるらしい。

RangeとWatchのreturnsにstreamとあるが、これはServer-side Streamingの宣言で、レスポンスをEntry構造体のストリームにすることができる。ストリームは非常に長いレスポンスの返却にも使えるし、Server-Sent Eventのようにサーバープッシュ用途にも使える。リクエストもストリーム化することができる(Client-side Streaming / Bidirectional Streaming)。

コード生成

上のkvs.protoから、protocコマンドでkvs.pb.goを生成する。

protoc  --go_out=plugins=grpc:. ./grpc-kvs.proto

ただし、事前にprotocol buffers 3.0 (未リリースなので、google/protobuf · GitHub からダウンロードしてインストールする必要がある)と github.com/golang/protobuf/protoc-gen-go をgo install しておく必要がある。

サービス実装

protocに成功すると、kvs.pb.goに下のようなサーバー用interfaceが定義されるので、これを実装してやれば良い。

grpc-kvs/grpc-kvs.pb.go at master · matope/grpc-kvs · GitHub

// Server API for Kvs service

type KvsServer interface {
    Get(context.Context, *GetRequest) (*GetResponse, error)
    Put(context.Context, *PutRequest) (*PutResponse, error)
    Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
    Range(*RangeRequest, Kvs_RangeServer) error
    Watch(*WatchRequest, Kvs_WatchServer) error
}

サーバー側実装はこのようになった。grpc-kvs/main.go at master · matope/grpc-kvs

type kvsServer struct {
    elements map[string]string
    mu       sync.RWMutex
    chans    map[chan pb.Entry]struct{}
}

func NewKvsServer() *kvsServer {
    return &kvsServer{
        elements: make(map[string]string),
        chans:    make(map[chan pb.Entry]struct{}),
    }
}

func (s *kvsServer) Get(ctx context.Context, r *pb.GetRequest) (*pb.GetResponse, error) {
    s.mu.RLock()
    defer s.mu.RUnlock()
    if val, ok := s.elements[r.Key]; ok {
        return &pb.GetResponse{
            Value: val,
        }, nil
    }
    return &pb.GetResponse{}, grpc.Errorf(codes.NotFound, "element not found value=[%s]", r.Key)
}

func (s *kvsServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.elements[r.Key] = r.Value

    // Notify updation
    for c := range s.chans {
        c <- pb.Entry{Key: r.Key, Value: r.Value}
    }
    return &pb.PutResponse{}, nil
}

func (s *kvsServer) Delete(ctx context.Context, r *pb.DeleteRequest) (*pb.DeleteResponse, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    delete(s.elements, r.Key)

    // Notify deletion
    for c := range s.chans {
        c <- pb.Entry{Key: r.Key}
    }

    return &pb.DeleteResponse{}, nil
}

func (s *kvsServer) Range(r *pb.RangeRequest, rs pb.Kvs_RangeServer) error {
    s.mu.RLock()
    defer s.mu.RUnlock()

    // sort and filter  keys of elements
    keys := make([]string, 0, len(s.elements))
    for k := range s.elements {
        if k < r.StartKey {
            continue
        }
        keys = append(keys, k)
    }
    sort.Strings(keys)

    for _, k := range keys {
        if err := rs.Send(&pb.Entry{Key: k, Value: s.elements[k]}); err != nil {
            return err
        }
    }
    return nil
}

func (s *kvsServer) Watch(r *pb.WatchRequest, ws pb.Kvs_WatchServer) error {
    ech := make(chan pb.Entry)
    s.mu.Lock()
    s.chans[ech] = struct{}{}
    s.mu.Unlock()
    fmt.Println("Added New Watcher", ech)

    defer func() {
        s.mu.Lock()
        delete(s.chans, ech)
        s.mu.Unlock()
        close(ech)
        fmt.Println("Deleted Watcher", ech)
    }()

    for e := range ech {
        if !strings.HasPrefix(e.Key, r.Prefix) {
            continue
        }
        err := ws.Send(&e)
        if err != nil {
            return err
        }
    }
    return nil
}

通常のRPCは、フレームワーク経由でnet/httpのハンドラを書くの感覚と大して変わらない。

ただしコード生成によりリクエストフィールドが全て静的にアクセスできるので、ボイラープレートコードが不要で良い。

REST APIでいうところの404などのアプリケーションエラーコードは、grpc.Errorfを使い、 https://godoc.org/google.golang.org/grpc/codes に定義されているエラーコードを渡してやるのが流儀らしい。

Sever-side Streaming RPCは、ハンドラに専用のサーバーが渡されるので、そのサーバーにSendしてやることでストリームの要素を送出する(Range, Watch参照)。

サーバー起動部分はこれだけ。

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterKvsServer(s, NewKvsServer())
    s.Serve(lis)
}

クライアント実装

grpc-kvs/main.go at master · matope/grpc-kvs · GitHub

protocにより完全なクライアントコードが生成されているので、クライアントの接続部分はこれだけでよい。

func main() {
    conn, err := grpc.Dial(addr, grpc.WithInsecure())
    if err != nil {
        log.Fatalf("did not connect: %v", err)
    }
    defer conn.Close()
    c := pb.NewKvsClient(conn)
 

あとはKvsClientに実装されたメソッドを使えば良い。

所感

コンポーネント間通信をREST APIで設計する場合、そこそこ綺麗に書いたとしても、どうしても認証・ハンドラルーティング、リクエスト・レスポンスのエンコーディングなど、ボイラープレートコードは発生してしまう。

また、メソッドを増やすたびにURLやBody, ステータスコードなどREST表現の全体としての整合性を考慮しなければならず、しかも使用しているURLルータによっては変更を余儀なくされたりとつらさを感じていた。そんな理由でRPCに魅力を感じてはいたが、できること・できないことがいまひとつ分からず、乗り換えられずにいた。

gRPCを試してみて、サービスの定義さえしてしまえばサーバ・クライアント実装が自動生成されるため、ボイラープレートコードも発生せず、本質的なロジックに集中して書き進めていけるのは快適だった。コードの質もよくなっていると思う。そのうえで、HTTPハンドラに出来てgRPCに出来ないことはあまりなさそうということも分かった。例えば大きなファイルのやり取りはチャンク化してStreaming APIで流せば大丈夫そうとか。

RPCの使用にはコネクティビティの心配もあるが、gRPCは主要な多くのプログラミング言語へのコード生成に対応しているので、実際に問題なることはなさそう。むしろC++あたりだと普通のREST APIよりクライアント/サーバーの実装が楽になって結果的にRESTより相互接続性が上がりそう。

性能

似たようなHTTP/1.1 + JSON版実装を作り、id:lestrrat さんのベンチマーク実装 http://lestrrat.ldblog.jp/archives/43568967.html を参考に、Getのパフォーマンスを比較してみた。

go バージョン 並列数 gRPC HTTP/1.1+JSON
go1.5.2 100 16161.13 jobs/sec 37361.61 jobs/sec
go1.5.2 1 6705.33 jobs/sec 10151.33 jobs/sec
gotip 100 21528.42 jobs/sec 37931.31 jobs/sec

(サーバ・クライアント同居での雑な計測です)

遅い…。便利であるとはいえ、スループットが生REST時の半分以下になるのはちょっと厳しい。gotipでビルドしたところかなりパフォーマンスが改善しているので、今後のgoの実装の改善を期待しつつ、パフォーマンスが重要でないところから導入したほうがいいかもしれない。

参考