Using LAZenoh
======================
.. |ZENOH-REPO| raw:: html
https://github.com/eclipse-zenoh/zenoh/tree/release-0.7.2-rc?tab=readme-ov-file
.. |LAZenoh-REPO| raw:: html
https://bitbucket.org/neslabpolimi/lazenoh
The Location-aware version of Zenoh (LAZenoh) works equally to the original version of Zenoh.
For configuration files and the usage of the Zenoh router (zenohd) please visit the the github page |ZENOH-REPO|
Python API
##########
Among the different APIs Zenoh provide, we provide location-aware functionality in the python API.
To use the location-aware API you need to import::
import location_zenoh
import zenoh
in the same way zenoh provide the ``open`` method ti initialize a zenoh session, with the location-aware API you can init a session using::
session = location_zenoh.open(config)
As a location encoding techniques, we support:
1. BASE64
2. MGRS (Military Grid Reference System)
3. Bloom Filter that uses a user defined domain grid
Wrapper Signatures
#################
The wrapper is build around the original Zenoh ``session`` class. We provide the same functions name with new signatures for location management.
Here the list of the new signatures::
#NEW: Declare the callback of the to get client position and the position lifetime
def declare_position_handler(self, lifetime, handler)
#NEW: For Bloom filter, to set the callback for base domain data
def set_domain_definition_handler(self, callback)
#NEW: Update the already set position lifetime
def set_position_lifetime(self, lifetime)
def put(self, key: IntoKeyExpr, value: IntoValue, precision: Precision = Precision.POINT, encoding=None,
priority: Priority = None, congestion_control: CongestionControl = None,
sample_kind: SampleKind = None) -> Any
def config(self) -> Config
def delete(self, key: IntoKeyExpr,
priority: Priority = None, congestion_control: CongestionControl = None)
def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver],
location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT,
consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver
def declare_keyexpr(self, key: IntoKeyExpr, device: ZDevice, location_shape: Shape = Shape.NONE,
shape_data: Dict = None, precision: Precision = Precision.POINT) -> KeyExpr
def declare_queryable(self, key: IntoKeyExpr, handler: IntoHandler[Query, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
complete: bool = None) -> Queryable
def declare_publisher(self, key: IntoKeyExpr, precision: Precision = Precision.POINT, priority: Priority = None,
congestion_control: CongestionControl = None) -> Publisher
def declare_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
reliability: Reliability = None) -> MobilitySubscriber | Subscriber
def declare_pull_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
reliability: Reliability = None) -> MobilitySubscriber | Subscriber
def close(self)
def info(self) -> Info
Publisher Example
#################
We provide a Location-aware BASE64 encoding publisher example::
import sys
import time
import argparse
import json
import numpy
import location_zenoh
from zenoh import CongestionControl
from location_zenoh import Coordinates, Precision
import zenoh
#Callback function used to return publisher position as a coordinate point
def loc_cb():
return Coordinates(1, 0.5)
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='locz_pub',
description='zenoh location awarene pub example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to listen on.')
parser.add_argument('--key', '-k', dest='key',
default='test/topic',
type=str,
help='The key expression to publish onto.')
parser.add_argument('--value', '-v', dest='value',
default='Pub from Python!',
type=str,
help='The value to publish.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
args = parser.parse_args()
conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value
# initiate logging
zenoh.init_logger()
#Open a location-aware zenoh session
session = location_zenoh.open(conf)
#Declare the callback to obtain publisher position
session.declare_position_handler(numpy.infty, loc_cb)
print(f"Declaring Publisher on '{key}'...")
pub = session.declare_publisher(key)
while True:
time.sleep(1)
buf = f"[Test] This is a Test"
print(f"Putting Data ('{key}': '{buf}')...")
pub.put(buf)
session.close()
Subscriber Example
#################
An example of stationary subscriber using the BASE64 encoding, subscribing using a rectangular area::
import sys
import time
import argparse
import json
import zenoh
from zenoh import Reliability, Sample
import location_zenoh
from location_zenoh import Coordinates, Shape
#Callback function used to return subscriber position as a coordinate point
def pos_cb():
return Coordinates(lat, long)
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='sub_example',
description='Subscriber Example')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
args = parser.parse_args()
conf = zenoh.Config.from_file(
args.config) if args.config is not None else zenoh.Config()
key = "test/topic"
#Location data definition
lat = 10.0
long = 20.1
#Object describing the rectangular area
shape_data = {
"min_lat" : 9.0,
"min_long" : 11.0,
"max_lat": 20.0,
"max_long": 20.2
}
# initiate logging
zenoh.init_logger()
print("Opening session...")
session = location_zenoh.open(conf)
#Declaring the subscriber as a static with numpy.infty, and the position callback
session.declare_position_handler(numpy.infty, pos_cb)
print("Declaring Subscriber on '{}'...".format(key))
def listener(sample: Sample):
print(sample.payload.decode("utf-8"))
#Declaring the actual subscriber, passing Shape.GEOGRAPHIC_RANGE to define the rectangular are defined in shape_data
sub = session.declare_subscriber(key, listener, location_shape= Shape.GEOGRAPHIC_RANGE, shape_data= shape_data,reliability=Reliability.RELIABLE())
while True:
pass
sub.undeclare()
session.close()
More Examples
#############
A usage example for each encoding can be found in the folder ``python-api/examples`` in our code repository |LAZenoh-REPO|.