Using LAZenoh ====================== .. |ZENOH-REPO| raw:: html https://github.com/eclipse-zenoh/zenoh/tree/release-0.7.2-rc?tab=readme-ov-file .. |LAZenoh-REPO| raw:: html https://bitbucket.org/neslabpolimi/lazenoh The Location-aware version of Zenoh (LAZenoh) works equally to the original version of Zenoh. For configuration files and the usage of the Zenoh router (zenohd) please visit the the github page |ZENOH-REPO| Python API ########## Among the different APIs Zenoh provide, we provide location-aware functionality in the python API. To use the location-aware API you need to import:: import location_zenoh import zenoh in the same way zenoh provide the ``open`` method ti initialize a zenoh session, with the location-aware API you can init a session using:: session = location_zenoh.open(config) As a location encoding techniques, we support: 1. BASE64 2. MGRS (Military Grid Reference System) 3. Bloom Filter that uses a user defined domain grid Wrapper Signatures ################# The wrapper is build around the original Zenoh ``session`` class. We provide the same functions name with new signatures for location management. Here the list of the new signatures:: #NEW: Declare the callback of the to get client position and the position lifetime def declare_position_handler(self, lifetime, handler) #NEW: For Bloom filter, to set the callback for base domain data def set_domain_definition_handler(self, callback) #NEW: Update the already set position lifetime def set_position_lifetime(self, lifetime) def put(self, key: IntoKeyExpr, value: IntoValue, precision: Precision = Precision.POINT, encoding=None, priority: Priority = None, congestion_control: CongestionControl = None, sample_kind: SampleKind = None) -> Any def config(self) -> Config def delete(self, key: IntoKeyExpr, priority: Priority = None, congestion_control: CongestionControl = None) def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver], location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT, consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver def declare_keyexpr(self, key: IntoKeyExpr, device: ZDevice, location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT) -> KeyExpr def declare_queryable(self, key: IntoKeyExpr, handler: IntoHandler[Query, Any, Any], location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT, complete: bool = None) -> Queryable def declare_publisher(self, key: IntoKeyExpr, precision: Precision = Precision.POINT, priority: Priority = None, congestion_control: CongestionControl = None) -> Publisher def declare_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT, reliability: Reliability = None) -> MobilitySubscriber | Subscriber def declare_pull_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any], location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT, reliability: Reliability = None) -> MobilitySubscriber | Subscriber def close(self) def info(self) -> Info Publisher Example ################# We provide a Location-aware BASE64 encoding publisher example:: import sys import time import argparse import json import numpy import location_zenoh from zenoh import CongestionControl from location_zenoh import Coordinates, Precision import zenoh #Callback function used to return publisher position as a coordinate point def loc_cb(): return Coordinates(1, 0.5) # --- Command line argument parsing --- --- --- --- --- --- parser = argparse.ArgumentParser( prog='locz_pub', description='zenoh location awarene pub example') parser.add_argument('--mode', '-m', dest='mode', choices=['peer', 'client'], type=str, help='The zenoh session mode.') parser.add_argument('--connect', '-e', dest='connect', metavar='ENDPOINT', action='append', type=str, help='Endpoints to connect to.') parser.add_argument('--listen', '-l', dest='listen', metavar='ENDPOINT', action='append', type=str, help='Endpoints to listen on.') parser.add_argument('--key', '-k', dest='key', default='test/topic', type=str, help='The key expression to publish onto.') parser.add_argument('--value', '-v', dest='value', default='Pub from Python!', type=str, help='The value to publish.') parser.add_argument('--config', '-c', dest='config', metavar='FILE', type=str, help='A configuration file.') args = parser.parse_args() conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config() if args.mode is not None: conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) if args.connect is not None: conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) if args.listen is not None: conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) key = args.key value = args.value # initiate logging zenoh.init_logger() #Open a location-aware zenoh session session = location_zenoh.open(conf) #Declare the callback to obtain publisher position session.declare_position_handler(numpy.infty, loc_cb) print(f"Declaring Publisher on '{key}'...") pub = session.declare_publisher(key) while True: time.sleep(1) buf = f"[Test] This is a Test" print(f"Putting Data ('{key}': '{buf}')...") pub.put(buf) session.close() Subscriber Example ################# An example of stationary subscriber using the BASE64 encoding, subscribing using a rectangular area:: import sys import time import argparse import json import zenoh from zenoh import Reliability, Sample import location_zenoh from location_zenoh import Coordinates, Shape #Callback function used to return subscriber position as a coordinate point def pos_cb(): return Coordinates(lat, long) # --- Command line argument parsing --- --- --- --- --- --- parser = argparse.ArgumentParser( prog='sub_example', description='Subscriber Example') parser.add_argument('--config', '-c', dest='config', metavar='FILE', type=str, help='A configuration file.') args = parser.parse_args() conf = zenoh.Config.from_file( args.config) if args.config is not None else zenoh.Config() key = "test/topic" #Location data definition lat = 10.0 long = 20.1 #Object describing the rectangular area shape_data = { "min_lat" : 9.0, "min_long" : 11.0, "max_lat": 20.0, "max_long": 20.2 } # initiate logging zenoh.init_logger() print("Opening session...") session = location_zenoh.open(conf) #Declaring the subscriber as a static with numpy.infty, and the position callback session.declare_position_handler(numpy.infty, pos_cb) print("Declaring Subscriber on '{}'...".format(key)) def listener(sample: Sample): print(sample.payload.decode("utf-8")) #Declaring the actual subscriber, passing Shape.GEOGRAPHIC_RANGE to define the rectangular are defined in shape_data sub = session.declare_subscriber(key, listener, location_shape= Shape.GEOGRAPHIC_RANGE, shape_data= shape_data,reliability=Reliability.RELIABLE()) while True: pass sub.undeclare() session.close() More Examples ############# A usage example for each encoding can be found in the folder ``python-api/examples`` in our code repository |LAZenoh-REPO|.