gRPCで動画ファイルの送受信を行ってみる

Go4 Advent Calendar 2018 11日目です。
今回はgRPCを使ってmp4で作成した動画ファイルをstreamを使って送受信してみようと思います。
コードがあまりきれいでないかもしれませんがよろしくお願いします。

はじめに

今回作成したサンプルコードはgithubに置いておきます。

github.com

gRPCとは

Googleが開発しているOpen SourceのRPC frameworkになり、Protocol Buffersを使ってサービス間のインターフェースをprotoファイルに定義します。また、試してませんが他言語間の通信も可能みたいです。

grpc.io

Protocol Buffersでどう定義するのかは下記のドキュメントに詳しく書かれています。

developers.google.com

protoファイルの定義

upload.proto

syntax = "proto3";
package upload;

service UploadHandler {
  rpc Upload(stream UploadRequest) returns (UploadReply) {};
}

message UploadRequest { bytes VideoData = 1; }

message UploadReply { string UploadStatus = 1; }

syntaxはproto3を利用しています。
serviceブロックの中に利用するメソッドを定義します。今回はmp4ファイルを受け取り、成功したらメッセージを返すようなメソッドを定義しています。
ここでUploadRequestの前にstreamをつけているのですが、これを記述することで一つのリクエストを複数のリクエストに分割することができます。今回はClient Streaming RPC方式になると思います。
また、このprotoファイルで生成したgoのファイルはclientとserverの両方必要になります。

Serverの作成

handler.go

func NewUploadServer(gserver *grpc.Server) {
    uploadserver := &server{}
    upload.RegisterUploadHandlerServer(gserver, uploadserver)
    reflection.Register(gserver)
}

type server struct{}

func (s *server) Upload(stream upload.UploadHandler_UploadServer) error {
    err := os.MkdirAll("Sample", 0777)
    if err != nil {
        return err
    }
    file, err := os.Create(filepath.Join("Sample", "tmp.mp4"))
    defer file.Close()
    if err != nil {
        return err
    }

    for {
        resp, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        file.Write(resp.VideoData)
    }
    err = stream.SendAndClose(&upload.UploadReply{UploadStatus: "OK"})
    if err != nil {

        return err
    }
    return nil

}

handler.goではclientから受け取った動画データをSampleディレクトリにtmp.mp4として保存するような処理を行っています。成功したらOKの文字をclientに送信します。
main.go

func main() {
    lis, err := net.Listen("tcp", "localhost:8080")
    if err != nil {
        panic(err)
    }
    server := grpc.NewServer()

    handler.NewUploadServer(server)
    if err := server.Serve(lis); err != nil {
        panic(err)
    }
}

main.goではNewServerで空のgrpcサーバーの作成を行い、NewUploadServerで作成したserviceを登録してlocalhost:8080でサーバーを立ち上げます。NewServerは色々オプションを渡せるみたいです。

Clientの作成

main.go

func main() {
    connect, _ := grpc.Dial("localhost:8080", grpc.WithInsecure())

    defer connect.Close()
    uploadhalder := upload.NewUploadHandlerClient(connect)
    stream, err := uploadhalder.Upload(context.Background())
    err = Upload(stream)
    if err != nil {
        fmt.Println(err)
    }
}

func Upload(stream upload.UploadHandler_UploadClient) error {
    file, _ := os.Open("./sample.mp4")
    defer file.Close()
    buf := make([]byte, 1024)

    for {
        _, err := file.Read(buf)
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        stream.Send(&upload.UploadRequest{VideoData: buf})
    }
    resp, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }
    fmt.Println(resp.UploadStatus)
    return nil
}

予めsample.mp4ファイルを対象のディレクトリに用意してmakeでどれくらい送るか定義しています。そしてfor内でひたすらserverにリクエストを投げまくってます。 送り終わったらCloseAndRecvでstreamをcloseしてサーバーからのメッセージを待ちます。うまく行けばOKと返ってきます。

完成!

実際に試してみるとclientから送られてきたSample.mp4ファイルがserverのSampleディレクトリ内にtmp.mp4として保存されたことが確認できました。

まとめ

  • streamを扱うことで動画データを分割して送ることができた。特に大容量の動画ファイルを送信したときに少しずつデータを送ることができるのでネットワークの負荷が減りそう
  • ライブストリーミングの変換処理とかstreamで実装すると良さそうな気がする