From 188bcf48fd008dfc23b2dd3eb5c831e1e7770b57 Mon Sep 17 00:00:00 2001 From: Francesco Capuano Date: Mon, 14 Apr 2025 15:40:15 +0200 Subject: [PATCH] add: grpc service between robot and remote policy server --- lerobot/scripts/server/async_inference.proto | 51 +++++ lerobot/scripts/server/async_inference_pb2.py | 44 ++++ .../server/async_inference_pb2_grpc.py | 191 ++++++++++++++++++ 3 files changed, 286 insertions(+) create mode 100644 lerobot/scripts/server/async_inference.proto create mode 100644 lerobot/scripts/server/async_inference_pb2.py create mode 100644 lerobot/scripts/server/async_inference_pb2_grpc.py diff --git a/lerobot/scripts/server/async_inference.proto b/lerobot/scripts/server/async_inference.proto new file mode 100644 index 00000000..b0299273 --- /dev/null +++ b/lerobot/scripts/server/async_inference.proto @@ -0,0 +1,51 @@ +// !/usr/bin/env python + +// Copyright 2024 The HuggingFace Inc. team. +// All rights reserved. + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at + +// http://www.apache.org/licenses/LICENSE-2.0 + +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +package async_inference; + +// AsyncInference: from Robot perspective +// Robot send observations to & executes action received from a remote Policy server +service AsyncInference { + // Robot -> Policy to share observations with a remote inference server + // Policy -> Robot to share actions predicted for given observations + rpc SendObservations(stream Observation) returns (Empty); + rpc StreamActions(Empty) returns (stream Action); + rpc Ready(Empty) returns (Empty); +} + +enum TransferState { + TRANSFER_UNKNOWN = 0; + TRANSFER_BEGIN = 1; + TRANSFER_MIDDLE = 2; + TRANSFER_END = 3; +} + +// Messages +message Observation { + // sent by Robot, to remote Policy + TransferState transfer_state = 1; + bytes data = 2; +} + +message Action { + // sent by remote Policy, to Robot + TransferState transfer_state = 1; + bytes data = 2; +} + +message Empty {} diff --git a/lerobot/scripts/server/async_inference_pb2.py b/lerobot/scripts/server/async_inference_pb2.py new file mode 100644 index 00000000..180ecd08 --- /dev/null +++ b/lerobot/scripts/server/async_inference_pb2.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: async_inference.proto +# Protobuf Python Version: 5.29.0 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 5, + 29, + 0, + '', + 'async_inference.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x61sync_inference.proto\x12\x0f\x61sync_inference\"S\n\x0bObservation\x12\x36\n\x0etransfer_state\x18\x01 \x01(\x0e\x32\x1e.async_inference.TransferState\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"N\n\x06\x41\x63tion\x12\x36\n\x0etransfer_state\x18\x01 \x01(\x0e\x32\x1e.async_inference.TransferState\x12\x0c\n\x04\x64\x61ta\x18\x02 \x01(\x0c\"\x07\n\x05\x45mpty*`\n\rTransferState\x12\x14\n\x10TRANSFER_UNKNOWN\x10\x00\x12\x12\n\x0eTRANSFER_BEGIN\x10\x01\x12\x13\n\x0fTRANSFER_MIDDLE\x10\x02\x12\x10\n\x0cTRANSFER_END\x10\x03\x32\xd9\x01\n\x0e\x41syncInference\x12J\n\x10SendObservations\x12\x1c.async_inference.Observation\x1a\x16.async_inference.Empty(\x01\x12\x42\n\rStreamActions\x12\x16.async_inference.Empty\x1a\x17.async_inference.Action0\x01\x12\x37\n\x05Ready\x12\x16.async_inference.Empty\x1a\x16.async_inference.Emptyb\x06proto3') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'async_inference_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_TRANSFERSTATE']._serialized_start=216 + _globals['_TRANSFERSTATE']._serialized_end=312 + _globals['_OBSERVATION']._serialized_start=42 + _globals['_OBSERVATION']._serialized_end=125 + _globals['_ACTION']._serialized_start=127 + _globals['_ACTION']._serialized_end=205 + _globals['_EMPTY']._serialized_start=207 + _globals['_EMPTY']._serialized_end=214 + _globals['_ASYNCINFERENCE']._serialized_start=315 + _globals['_ASYNCINFERENCE']._serialized_end=532 +# @@protoc_insertion_point(module_scope) diff --git a/lerobot/scripts/server/async_inference_pb2_grpc.py b/lerobot/scripts/server/async_inference_pb2_grpc.py new file mode 100644 index 00000000..30bf95fb --- /dev/null +++ b/lerobot/scripts/server/async_inference_pb2_grpc.py @@ -0,0 +1,191 @@ +# Generated by the gRPC Python protocol compiler plugin. DO NOT EDIT! +"""Client and server classes corresponding to protobuf-defined services.""" +import grpc +import warnings + +import async_inference_pb2 as async__inference__pb2 + +GRPC_GENERATED_VERSION = '1.71.0' +GRPC_VERSION = grpc.__version__ +_version_not_supported = False + +try: + from grpc._utilities import first_version_is_lower + _version_not_supported = first_version_is_lower(GRPC_VERSION, GRPC_GENERATED_VERSION) +except ImportError: + _version_not_supported = True + +if _version_not_supported: + raise RuntimeError( + f'The grpc package installed is at version {GRPC_VERSION},' + + f' but the generated code in async_inference_pb2_grpc.py depends on' + + f' grpcio>={GRPC_GENERATED_VERSION}.' + + f' Please upgrade your grpc module to grpcio>={GRPC_GENERATED_VERSION}' + + f' or downgrade your generated code using grpcio-tools<={GRPC_VERSION}.' + ) + + +class AsyncInferenceStub(object): + """AsyncInference: from Robot perspective + Robot send observations to & executes action received from a remote Policy server + """ + + def __init__(self, channel): + """Constructor. + + Args: + channel: A grpc.Channel. + """ + self.SendObservations = channel.stream_unary( + '/async_inference.AsyncInference/SendObservations', + request_serializer=async__inference__pb2.Observation.SerializeToString, + response_deserializer=async__inference__pb2.Empty.FromString, + _registered_method=True) + self.StreamActions = channel.unary_stream( + '/async_inference.AsyncInference/StreamActions', + request_serializer=async__inference__pb2.Empty.SerializeToString, + response_deserializer=async__inference__pb2.Action.FromString, + _registered_method=True) + self.Ready = channel.unary_unary( + '/async_inference.AsyncInference/Ready', + request_serializer=async__inference__pb2.Empty.SerializeToString, + response_deserializer=async__inference__pb2.Empty.FromString, + _registered_method=True) + + +class AsyncInferenceServicer(object): + """AsyncInference: from Robot perspective + Robot send observations to & executes action received from a remote Policy server + """ + + def SendObservations(self, request_iterator, context): + """Robot -> Policy to share observations with a remote inference server + Policy -> Robot to share actions predicted for given observations + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def StreamActions(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + def Ready(self, request, context): + """Missing associated documentation comment in .proto file.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + + +def add_AsyncInferenceServicer_to_server(servicer, server): + rpc_method_handlers = { + 'SendObservations': grpc.stream_unary_rpc_method_handler( + servicer.SendObservations, + request_deserializer=async__inference__pb2.Observation.FromString, + response_serializer=async__inference__pb2.Empty.SerializeToString, + ), + 'StreamActions': grpc.unary_stream_rpc_method_handler( + servicer.StreamActions, + request_deserializer=async__inference__pb2.Empty.FromString, + response_serializer=async__inference__pb2.Action.SerializeToString, + ), + 'Ready': grpc.unary_unary_rpc_method_handler( + servicer.Ready, + request_deserializer=async__inference__pb2.Empty.FromString, + response_serializer=async__inference__pb2.Empty.SerializeToString, + ), + } + generic_handler = grpc.method_handlers_generic_handler( + 'async_inference.AsyncInference', rpc_method_handlers) + server.add_generic_rpc_handlers((generic_handler,)) + server.add_registered_method_handlers('async_inference.AsyncInference', rpc_method_handlers) + + + # This class is part of an EXPERIMENTAL API. +class AsyncInference(object): + """AsyncInference: from Robot perspective + Robot send observations to & executes action received from a remote Policy server + """ + + @staticmethod + def SendObservations(request_iterator, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.stream_unary( + request_iterator, + target, + '/async_inference.AsyncInference/SendObservations', + async__inference__pb2.Observation.SerializeToString, + async__inference__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def StreamActions(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream( + request, + target, + '/async_inference.AsyncInference/StreamActions', + async__inference__pb2.Empty.SerializeToString, + async__inference__pb2.Action.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True) + + @staticmethod + def Ready(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_unary( + request, + target, + '/async_inference.AsyncInference/Ready', + async__inference__pb2.Empty.SerializeToString, + async__inference__pb2.Empty.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + _registered_method=True)