Skip to main content

Programming with Volga

Using Volga from a Python application deployed with Avassa

Approles and policies

In order to use Volga, your application will need to be granted a policy that allows some kind of access to at least one Volga topic. To create a simple policy that allows full access to a single topic, run the following command:

supctl -r create policy policies <<EOF
name: mypolicy
volga:
topics:
- name: "mytopic"
operations:
create: allow
delete: allow
produce: allow
consume: allow
EOF

This policy allows full access to the topic mytopic. For further information on configuring policies, refer to the Policies howto.

Applications typically authenticate using approles. Each approle can provide one or more policies. Run the following command to create an approle called myapprole and give it the mypolicy policy:

supctl -r create strongbox authentication approles <<EOF
name: myapprole
token-policies: ['mypolicy']
EOF

In order to authenticate as myapprole, the application must provide two system-generated parameters: the role-id for myapprole and the corresponding approle secret. The role-id is generated when the approle is created and can be retrieved with the following command:

supctl show strongbox authentication approles myapprole | grep role-id
role-id: 324bc0bf-40c3-4aa5-bb58-360b830405ac

The role id can be bundled with your application. The approle secret on the other hand is generated when the container is deployed and must be passed to your application through an environment variable defined in the application specification:

name: myapp
services:
- name: myapp-service
containers:
- name: myapp-container
image: myapp
cmd: [/myapp/myapp.py]
env:
APPROLE_SECRET_ID: "${SYS_APPROLE_SECRET_ID}"
API_CA_CERT: "${SYS_API_CA_CERT}"
PYTHONPATH: /myapp
approle: myapprole
mode: one-per-matching-host

Note also the other two environment variables, as they are also of some importance:

  • API_CA_CERT - This variable will be assigned the CA certificate for the api server. Without it, the python client will be unable to verify the API server certificate and it will refuse to connect.
  • PYTHONPATH - This is just for convenience, to allow python applications to import mopdules from the /myapp directory with no extra hassle.

Authenticating from Python

The first thing your application needs to do is to authenticate with supd:

import os
import avassa_client

role_id = '324bc0bf-40c3-4aa5-bb58-360b830405ac'
approle_secret = os.environ['APPROLE_SECRET_ID']
session = avassa_client.approle_login(host='https://api.internal:4646',
role_id=role_id,
secret_id=approle_secret,
user_agent='myapp/0.0.0')

The arguments sent to approle_login should be mostly self-explanatory:

  • host - The host to connect to. api.internal will resolve to the correct ip inside the container.
  • role_id - The role-id as retreived above is hard-coded in this example.
  • secret_id - The approle secret is retrieved from the environment variable APPROLE_SECRET_ID.
  • user_agent - The user agent parameter is optional but its value will be recorded in the audit trail log, so setting it to something unique can help with troubleshooting later.

If authentication is successful, a Session object containing an authentication token is returned.

The Python client

The Volga python client, just like the websockets module it is built upon, uses an asynchronous programming model based on coroutines. It is possible to run the examples below in a python shell, but they need to be wrapped in an event loop. For example, to run the produce_stuff coroutine below:

import asyncio

asyncio.run(produce_stuff(session))

A Volga producer

There is no explicit create request for Volga topics. A topic is created by attempting to connect to it and specifying "on-no-exists": "create" in the json request message. Using the python client it looks like this:

import socket
import avassa_client.volga as volga

async def produce_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.create(fmt='json', replication_factor=3)
topic = volga.Topic.local('mytopic')
async with volga.Producer(session=session,
producer_name=myhostname,
topic=topic,
on_no_exists=create_opts) as producer:
pass # Do something with the producer here later

The parameters of the Producer constructor are the following:

  • session - The session object returned during authentication.
  • producer_name - The name of this producer. It must be unique among all producers on the topic.
  • topic - This specifies not only the name of the topic to be opened, but also where it should reside. Topic.local is probably the most common option and means that the topic should be opened (or created) at the local site.
  • on_no_exists - What to do if the topic does not already exist. CreateOptions.create(fmt='json', replication_factor=3), as used here, means that the topic should be created with all topic options set to their default values, except format and replication-factor, which are set to json and 3 respectively.

This will attempt to open a producer on the topic mytopic. If the topic does not exist, it will be created and its format will be set to json. If the topic already exists with a format other than json or a replication factor other than 3, the request will fail.

Aside from possibly creating the topic, the producer object serves only one purpose, and that is to produce messages. Messages are sent using the produce coroutine, and seeing as the topic format is json, the message payload is expected to be a python dictionary. Expanding upon the produce_stuff coroutine above:

import socket
import avassa_client.volga as volga

async def produce_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.create(fmt='json')
topic = volga.Topic.local('mytopic')
async with volga.Producer(session=session,
producer_name=myhostname,
topic=topic,
on_no_exists=create_opts) as producer:
counter = 1
while True:
msg = {'message': "I'm sending a message!",
'counter': counter}
await producer.produce(msg)
await asyncio.sleep(10)
counter += 1

This will produce a (slightly inane) message on mytopic every ten seconds, the payload being the msg dictionary serialized to json. The first message will look like this:

{
"message": "I'm sending a message",
"counter": 0
}

A Volga consumer

A consumer can be created in a similar fashion, the parameters are just slightly different:

import socket
import avassa_client.volga as volga

async def consume_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.wait()
topic = volga.Topic.local('mytopic')
position = volga.Position.beginning()
async with volga.Consumer(session=session,
consumer_name=myhostname,
mode='exclusive',
position=position,
topic=topic,
on_no_exists=create_opts) as consumer:
await consumer.more(1)
while True:
msg = await consumer.recv()
payload = msg['payload']
print(f'I have received a message: {payload}')

The Consumer constructor itself takes two additional parameters compared to the Producer:

  • mode - How to handle multiple consumers with the same name. exclusive means that there can only be one consumer with each name.
  • position - An object describing where in the message stream to begin consuming messages. beginning, as used here, means that the consumer wants all messages on the topic, starting with the oldest one.

Note also that create_opts is set to CreateOptions.wait() here, as opposed to CreateOptions.create(fmt='json', replication_factor=3) in the producer. This means that if the topic does not exist at the time when this code runs, the consumer will still be successfully created, it just won't be able to receive any messages until the topic is created by someone else. It would be perfectly legal to use CreateOptions.create(fmt='json', replication_factor=3), just like in the producer, but the benefit to using wait here is that this piece of code doesn't need to know the specific topic options, such as the replication factor.

The first thing a consumer needs to do in order to receive any messages is to send a more(n) request, indicating how many messages it is ready to receive. Messages can then be received one by one by calling the recv coroutine. As you can see, in the example above there are no further calls to more after the initial one. This is because by default, recv will automatically issue more(10) requests when needed. The recv coroutine has a single optional parameter, auto_more, controlling the n value of its automatic more requests. It defaults to 10, but if auto_more is set to 0, no more request will be issued.

It is important to remember that the value received from the recv coroutine is a full Volga message object in the form of a python dictionary. It contains a bunch of metadata fields, and the message sent by the producer is found in the payload field. The full message will look something like this:

{
"time": "2022-03-31T09:16:39.445Z",
"seqno": 1,
"remain": 0,
"payload": {
"message": "I'm sending a message!",
"counter": 0
},
"producer-name": "host1",
"mtime": 1648718199445,
}

Accessing Volga from the outside

Volga can be accessed from outside the Avassa environment, typically by connecting to the control tower site, but connecting to an edge site is also an option assuming you have connectivity. The only difference when connecting from the outside is in how your application authenticates. There are many different options for authenticating from the outside, see the Obtaining a Token section of the Programming with Strongbox tutorial.

Using the query-topics API

The query-topics API is a powerful tool that can be used to obtain and filter logs from multiple edge sites in parallel. The request is always made to the Control Tower. A simple python function that searches for error messages in container logs across all sites could look like this:

import avassa_client.volga as volga

async def consume_query_topics(session):
query = {"topics": [{"re-match-topic-name": "^system:container-logs:",
"filter" : {"string": {"match" : ["ERROR"]}}}]}
async with volga.QueryTopicsConsumer(session, query) as qt:
await qt.more(10)
while msg := await qt.recv():
payload = msg['payload']
print(f'I have received a message: {payload}')

This query will search all topics with names beginning with system:container and return all messages containing the string ERROR. For a full description of the different parameters that can be used, refer to the websocket API reference. Once the QueryTopicsConsumer has been created, it behaves very much like a regular Consumer object.

The QueryTopicsConsumer constructor takes just two parameters. A session object and a python dictionary representation of the query itself. There is no need to include the op field as this is added by the client.

The client also sets the field auto-more to false. This is of little consequence as long as you are using the python client, as it will automatically issue more calls for you, but if you are implementing your own client, it is strongly recommended that you also set auto-more to false. The default value of auto-more is currently true for for backward compatibility reasons, but this is subject to change in a future relase.

Note also that, unlike regular consumers, the QueryTopicsConsumer will shut down (meaning recv will return None once) when there are no more messages to deliver, unless the follow parameter is set to true in the query parameters.