Using LAZenoh

The Location-aware version of Zenoh (LAZenoh) works equally to the original version of Zenoh.

For configuration files and the usage of the Zenoh router (zenohd) please visit the the github page https://github.com/eclipse-zenoh/zenoh/tree/release-0.7.2-rc?tab=readme-ov-file

Python API

Among the different APIs Zenoh provide, we provide location-aware functionality in the python API.

To use the location-aware API you need to import:

import location_zenoh
import zenoh

in the same way zenoh provide the open method ti initialize a zenoh session, with the location-aware API you can init a session using:

session = location_zenoh.open(config)

As a location encoding techniques, we support:

  1. BASE64

  2. MGRS (Military Grid Reference System)

  3. Bloom Filter that uses a user defined domain grid

Wrapper Signatures

The wrapper is build around the original Zenoh session class. We provide the same functions name with new signatures for location management.

Here the list of the new signatures:

#NEW: Declare the callback of the to get client position and the position lifetime
def declare_position_handler(self, lifetime, handler)

#NEW: For Bloom filter, to set the callback for base domain data
def set_domain_definition_handler(self, callback)

#NEW: Update the already set position lifetime
def set_position_lifetime(self, lifetime)

def put(self, key: IntoKeyExpr, value: IntoValue, precision: Precision = Precision.POINT, encoding=None,
        priority: Priority = None, congestion_control: CongestionControl = None,
        sample_kind: SampleKind = None) -> Any

def config(self) -> Config

def delete(self, key: IntoKeyExpr,
           priority: Priority = None, congestion_control: CongestionControl = None)

def get(self, selector: IntoSelector, handler: IntoHandler[Reply, Any, Receiver],
        location_shape: Shape = Shape.NONE, shape_data: Dict = None, precision: Precision = Precision.POINT,
        consolidation: QueryConsolidation = None, target: QueryTarget = None, value: IntoValue = None) -> Receiver

def declare_keyexpr(self, key: IntoKeyExpr, device: ZDevice, location_shape: Shape = Shape.NONE,
                    shape_data: Dict = None, precision: Precision = Precision.POINT) -> KeyExpr

def declare_queryable(self, key: IntoKeyExpr, handler: IntoHandler[Query, Any, Any],
                      location_shape: Shape = Shape.NONE, shape_data: Dict = None,
                      precision: Precision = Precision.POINT,
                      complete: bool = None) -> Queryable

def declare_publisher(self, key: IntoKeyExpr, precision: Precision = Precision.POINT, priority: Priority = None,
                      congestion_control: CongestionControl = None) -> Publisher

def declare_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
                       location_shape: Shape = Shape.NONE, shape_data: Dict = None,
                       precision: Precision = Precision.POINT,
                       reliability: Reliability = None) -> MobilitySubscriber | Subscriber

def declare_pull_subscriber(self, key: IntoKeyExpr, handler: IntoHandler[Sample, Any, Any],
                            location_shape: Shape = Shape.NONE, shape_data: Dict = None,
                            precision: Precision = Precision.POINT,
                            reliability: Reliability = None) -> MobilitySubscriber | Subscriber

def close(self)

def info(self) -> Info

Publisher Example

We provide a Location-aware BASE64 encoding publisher example:

import sys
import time
import argparse
import json

import numpy

import location_zenoh
from zenoh import CongestionControl
from location_zenoh import Coordinates, Precision
import zenoh

#Callback function used to return publisher position as a coordinate point
def loc_cb():
    return Coordinates(1, 0.5)



# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
    prog='locz_pub',
    description='zenoh location awarene pub example')
parser.add_argument('--mode', '-m', dest='mode',
                    choices=['peer', 'client'],
                    type=str,
                    help='The zenoh session mode.')
parser.add_argument('--connect', '-e', dest='connect',
                    metavar='ENDPOINT',
                    action='append',
                    type=str,
                    help='Endpoints to connect to.')
parser.add_argument('--listen', '-l', dest='listen',
                    metavar='ENDPOINT',
                    action='append',
                    type=str,
                    help='Endpoints to listen on.')
parser.add_argument('--key', '-k', dest='key',
                    default='test/topic',
                    type=str,
                    help='The key expression to publish onto.')
parser.add_argument('--value', '-v', dest='value',
                    default='Pub from Python!',
                    type=str,
                    help='The value to publish.')
parser.add_argument('--config', '-c', dest='config',
                    metavar='FILE',
                    type=str,
                    help='A configuration file.')

args = parser.parse_args()
conf = zenoh.Config.from_file(args.config) if args.config is not None else zenoh.Config()
if args.mode is not None:
    conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode))
if args.connect is not None:
    conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect))
if args.listen is not None:
    conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen))
key = args.key
value = args.value

# initiate logging
zenoh.init_logger()

#Open a location-aware zenoh session
session = location_zenoh.open(conf)
#Declare the callback to obtain publisher position
session.declare_position_handler(numpy.infty, loc_cb)

print(f"Declaring Publisher on '{key}'...")

pub = session.declare_publisher(key)

while True:
    time.sleep(1)
    buf = f"[Test] This is a Test"
    print(f"Putting Data ('{key}': '{buf}')...")
    pub.put(buf)

session.close()

Subscriber Example

An example of stationary subscriber using the BASE64 encoding, subscribing using a rectangular area:

import sys
import time
import argparse
import json

import zenoh
from zenoh import Reliability, Sample
import location_zenoh
from location_zenoh import Coordinates, Shape

#Callback function used to return subscriber position as a coordinate point
def pos_cb():
    return Coordinates(lat, long)

# --- Command line argument parsing --- --- --- --- --- ---
parser = argparse.ArgumentParser(
    prog='sub_example',
    description='Subscriber Example')
parser.add_argument('--config', '-c', dest='config',
                    metavar='FILE',
                    type=str,
                    help='A configuration file.')


args = parser.parse_args()
conf = zenoh.Config.from_file(
    args.config) if args.config is not None else zenoh.Config()
key = "test/topic"


#Location data definition
lat = 10.0
long = 20.1

#Object describing the rectangular area
shape_data = {
    "min_lat" : 9.0,
    "min_long" : 11.0,
    "max_lat": 20.0,
    "max_long": 20.2
}
# initiate logging
zenoh.init_logger()

print("Opening session...")
session = location_zenoh.open(conf)
#Declaring the subscriber as a static with numpy.infty, and the position callback
session.declare_position_handler(numpy.infty, pos_cb)

print("Declaring Subscriber on '{}'...".format(key))

def listener(sample: Sample):
    print(sample.payload.decode("utf-8"))

#Declaring the actual subscriber, passing Shape.GEOGRAPHIC_RANGE to define the rectangular are defined in shape_data
sub = session.declare_subscriber(key, listener, location_shape= Shape.GEOGRAPHIC_RANGE, shape_data= shape_data,reliability=Reliability.RELIABLE())

while True:
    pass

sub.undeclare()
session.close()

More Examples

A usage example for each encoding can be found in the folder python-api/examples in our code repository https://bitbucket.org/neslabpolimi/lazenoh.