Using LAZenoh
The Location-aware version of Zenoh (LAZenoh) works equally to the original version of Zenoh.
For configuration files and the usage of the Zenoh router (zenohd) please visit the the github page https://github.com/eclipse-zenoh/zenoh/tree/release-0.7.2-rc?tab=readme-ov-file
Python API
Among the different APIs Zenoh provide, we provide location-aware functionality in the python API.
To use the location-aware API you need to import:
import location_zenoh
import zenoh
in the same way zenoh provide the open
method ti initialize a zenoh session, with the location-aware API you can init a session using:
session = location_zenoh.open(config)
As a location encoding techniques, we support:
BASE64
MGRS (Military Grid Reference System)
Bloom Filter that uses a user defined domain grid
Wrapper Signatures
The wrapper is build around the original Zenoh session
class. We provide the same functions name with new signatures for location management.
Here the list of the new signatures:
#NEW: Declare the callback of the to get client position and the position lifetime
def declare_position_handler(self, lifetime, handler)
#NEW: For Bloom filter, to set the callback for base domain data
def set_domain_definition_handler(self, callback)
#NEW: Update the already set position lifetime
def set_position_lifetime(self, lifetime)
def put(self, key: IntoKeyExpr, value: IntoValue, precision: Precision = Precision.POINT, encoding=None,
priority: Priority = None, congestion_control: CongestionControl = None,
sample_kind: SampleKind = None) -> Any
def config(self) -> Config
def delete(self, key: IntoKeyExpr,
priority: Priority = None, congestion_control: CongestionControl = None)
def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver],
location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT,
consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver
def declare_keyexpr(self, key: IntoKeyExpr, device: ZDevice, location_shape: Shape = Shape.NONE,
shape_data: Dict = None, precision: Precision = Precision.POINT) -> KeyExpr
def declare_queryable(self, key: IntoKeyExpr, handler: IntoHandler[Query, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
complete: bool = None) -> Queryable
def declare_publisher(self, key: IntoKeyExpr, precision: Precision = Precision.POINT, priority: Priority = None,
congestion_control: CongestionControl = None) -> Publisher
def declare_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
reliability: Reliability = None) -> MobilitySubscriber | Subscriber
def declare_pull_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
location_shape: Shape = Shape.NONE, shape_data: Dict = None,
precision: Precision = Precision.POINT,
reliability: Reliability = None) -> MobilitySubscriber | Subscriber
def close(self)
def info(self) -> Info
Publisher Example
We provide a Location-aware BASE64 encoding publisher example:
import sys
import time
import argparse
import json
import numpy
import location_zenoh
from zenoh import CongestionControl
from location_zenoh import Coordinates, Precision
import zenoh
#Callback function used to return publisher position as a coordinate point
def loc_cb():
return Coordinates(1, 0.5)
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='locz_pub',
description='zenoh location awarene pub example')
parser.add_argument('--mode', '-m', dest='mode',
choices=['peer', 'client'],
type=str,
help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
metavar='ENDPOINT',
action='append',
type=str,
help='Endpoints to listen on.')
parser.add_argument('--key', '-k', dest='key',
default='test/topic',
type=str,
help='The key expression to publish onto.')
parser.add_argument('--value', '-v', dest='value',
default='Pub from Python!',
type=str,
help='The value to publish.')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
args = parser.parse_args()
conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value
# initiate logging
zenoh.init_logger()
#Open a location-aware zenoh session
session = location_zenoh.open(conf)
#Declare the callback to obtain publisher position
session.declare_position_handler(numpy.infty, loc_cb)
print(f"Declaring Publisher on '{key}'...")
pub = session.declare_publisher(key)
while True:
time.sleep(1)
buf = f"[Test] This is a Test"
print(f"Putting Data ('{key}': '{buf}')...")
pub.put(buf)
session.close()
Subscriber Example
An example of stationary subscriber using the BASE64 encoding, subscribing using a rectangular area:
import sys
import time
import argparse
import json
import zenoh
from zenoh import Reliability, Sample
import location_zenoh
from location_zenoh import Coordinates, Shape
#Callback function used to return subscriber position as a coordinate point
def pos_cb():
return Coordinates(lat, long)
# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
prog='sub_example',
description='Subscriber Example')
parser.add_argument('--config', '-c', dest='config',
metavar='FILE',
type=str,
help='A configuration file.')
args = parser.parse_args()
conf = zenoh.Config.from_file(
args.config) if args.config is not None else zenoh.Config()
key = "test/topic"
#Location data definition
lat = 10.0
long = 20.1
#Object describing the rectangular area
shape_data = {
"min_lat" : 9.0,
"min_long" : 11.0,
"max_lat": 20.0,
"max_long": 20.2
}
# initiate logging
zenoh.init_logger()
print("Opening session...")
session = location_zenoh.open(conf)
#Declaring the subscriber as a static with numpy.infty, and the position callback
session.declare_position_handler(numpy.infty, pos_cb)
print("Declaring Subscriber on '{}'...".format(key))
def listener(sample: Sample):
print(sample.payload.decode("utf-8"))
#Declaring the actual subscriber, passing Shape.GEOGRAPHIC_RANGE to define the rectangular are defined in shape_data
sub = session.declare_subscriber(key, listener, location_shape= Shape.GEOGRAPHIC_RANGE, shape_data= shape_data,reliability=Reliability.RELIABLE())
while True:
pass
sub.undeclare()
session.close()
More Examples
A usage example for each encoding can be found in the folder python-api/examples
in our code repository https://bitbucket.org/neslabpolimi/lazenoh.