Programming with Volga
Using Volga from a Python application deployed with Avassa
Approles and policies
In order to use Volga, your application will need to be granted a policy that allows some kind of access to at least one Volga topic. To create a simple policy that allows full access to a single topic, run the following command:
supctl -r create policy policies <<EOF
name: mypolicy
volga:
topics:
- name: "mytopic"
operations:
create: allow
delete: allow
produce: allow
consume: allow
EOF
This policy allows full access to the topic mytopic
. For further information on
configuring policies, refer to the Policies howto.
Applications typically authenticate using approles. Each approle can provide one
or more policies. Run the following command to create an approle called
myapprole
and give it the mypolicy
policy:
supctl -r create strongbox authentication approles <<EOF
name: myapprole
token-policies: ['mypolicy']
EOF
In order to authenticate as myapprole
, the application must provide two
system-generated parameters: the role-id for myapprole
and the corresponding
approle secret. The role-id is generated when the approle is created and can be
retrieved with the following command:
supctl show strongbox authentication approles myapprole | grep role-id
role-id: 324bc0bf-40c3-4aa5-bb58-360b830405ac
The role id can be bundled with your application. The approle secret on the other hand is generated when the container is deployed and must be passed to your application through an environment variable defined in the application specification:
name: myapp
services:
- name: myapp-service
containers:
- name: myapp-container
image: myapp
cmd: [/myapp/myapp.py]
env:
APPROLE_SECRET_ID: "${SYS_APPROLE_SECRET_ID}"
API_CA_CERT: "${SYS_API_CA_CERT}"
PYTHONPATH: /myapp
approle: myapprole
mode: one-per-matching-host
Note also the other two environment variables, as they are also of some importance:
- API_CA_CERT - This variable will be assigned the CA certificate for the api server. Without it, the python client will be unable to verify the API server certificate and it will refuse to connect.
- PYTHONPATH - This is just for convenience, to allow python applications to
import mopdules from the
/myapp
directory with no extra hassle.
Authenticating from Python
The first thing your application needs to do is to authenticate with supd:
import os
import avassa_client
role_id = '324bc0bf-40c3-4aa5-bb58-360b830405ac'
approle_secret = os.environ['APPROLE_SECRET_ID']
session = avassa_client.approle_login(host='https://api.internal:4646',
role_id=role_id,
secret_id=approle_secret,
user_agent='myapp/0.0.0')
The arguments sent to approle_login should be mostly self-explanatory:
host
- The host to connect to.api.internal
will resolve to the correct ip inside the container.role_id
- The role-id as retreived above is hard-coded in this example.secret_id
- The approle secret is retrieved from the environment variable APPROLE_SECRET_ID.user_agent
- The user agent parameter is optional but its value will be recorded in the audit trail log, so setting it to something unique can help with troubleshooting later.
If authentication is successful, a Session object containing an authentication token is returned.
The Python client
The Volga python client, just like the websockets module it is built upon, uses an asynchronous programming model based on coroutines. It is possible to run the examples below in a python shell, but they need to be wrapped in an event loop. For example, to run the produce_stuff coroutine below:
import asyncio
asyncio.run(produce_stuff(session))
A Volga producer
There is no explicit create request for Volga topics. A topic is created by
attempting to connect to it and specifying "on-no-exists": "create"
in the json
request message. Using the python client it looks like this:
import socket
import avassa_client.volga as volga
async def produce_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.create(fmt='json', replication_factor=3)
topic = volga.Topic.local('mytopic')
async with volga.Producer(session=session,
producer_name=myhostname,
topic=topic,
on_no_exists=create_opts) as producer:
pass # Do something with the producer here later
The parameters of the Producer constructor are the following:
session
- The session object returned during authentication.producer_name
- The name of this producer. It must be unique among all producers on the topic.topic
- This specifies not only the name of the topic to be opened, but also where it should reside.Topic.local
is probably the most common option and means that the topic should be opened (or created) at the local site.on_no_exists
- What to do if the topic does not already exist.CreateOptions.create(fmt='json', replication_factor=3)
, as used here, means that the topic should be created with all topic options set to their default values, exceptformat
andreplication-factor
, which are set tojson
and3
respectively.
This will attempt to open a producer on the topic mytopic
. If the topic does
not exist, it will be created and its format will be set to json
. If the topic
already exists with a format other than json
or a replication factor other
than 3
, the request will fail.
Aside from possibly creating the topic, the producer object serves only one
purpose, and that is to produce messages. Messages are sent using the produce
coroutine, and seeing as the topic format is json, the message payload is
expected to be a python dictionary. Expanding upon the produce_stuff
coroutine
above:
import socket
import avassa_client.volga as volga
async def produce_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.create(fmt='json')
topic = volga.Topic.local('mytopic')
async with volga.Producer(session=session,
producer_name=myhostname,
topic=topic,
on_no_exists=create_opts) as producer:
counter = 1
while True:
msg = {'message': "I'm sending a message!",
'counter': counter}
await producer.produce(msg)
await asyncio.sleep(10)
counter += 1
This will produce a (slightly inane) message on mytopic
every ten seconds, the
payload being the msg
dictionary serialized to json. The first message will
look like this:
{
"message": "I'm sending a message",
"counter": 0
}
A Volga consumer
A consumer can be created in a similar fashion, the parameters are just slightly different:
import socket
import avassa_client.volga as volga
async def consume_stuff(session):
myhostname = socket.gethostname()
create_opts = volga.CreateOptions.wait()
topic = volga.Topic.local('mytopic')
position = volga.Position.beginning()
async with volga.Consumer(session=session,
consumer_name=myhostname,
mode='exclusive',
position=position,
topic=topic,
on_no_exists=create_opts) as consumer:
await consumer.more(1)
while True:
msg = await consumer.recv()
payload = msg['payload']
print(f'I have received a message: {payload}')
The Consumer constructor itself takes two additional parameters compared to the Producer:
mode
- How to handle multiple consumers with the same name.exclusive
means that there can only be one consumer with each name.position
- An object describing where in the message stream to begin consuming messages.beginning
, as used here, means that the consumer wants all messages on the topic, starting with the oldest one.
Note also that create_opts
is set to CreateOptions.wait()
here, as opposed
to CreateOptions.create(fmt='json', replication_factor=3)
in the producer.
This means that if the topic does not exist at the time when this code runs, the
consumer will still be successfully created, it just won't be able to receive
any messages until the topic is created by someone else. It would be perfectly
legal to use CreateOptions.create(fmt='json', replication_factor=3)
, just like
in the producer, but the benefit to using wait
here is that this piece of code
doesn't need to know the specific topic options, such as the replication factor.
The first thing a consumer needs to do in order to receive any messages is to
send a more(n)
request, indicating how many messages it is ready to
receive. Messages can then be received one by one by calling the recv
coroutine. As you can see, in the example above there are no further calls to
more
after the initial one. This is because by default, recv
will
automatically issue more(10)
requests when needed. The recv
coroutine has a
single optional parameter, auto_more
, controlling the n
value of its automatic
more
requests. It defaults to 10, but if auto_more
is set to 0, no more
request will be issued.
It is important to remember that the value received from the recv
coroutine is
a full Volga message object in the form of a python dictionary. It contains a
bunch of metadata fields, and the message sent by the producer is found in the
payload
field. The full message will look something like this:
{
"time": "2022-03-31T09:16:39.445Z",
"seqno": 1,
"remain": 0,
"payload": {
"message": "I'm sending a message!",
"counter": 0
},
"producer-name": "host1",
"mtime": 1648718199445,
}
Accessing Volga from the outside
Volga can be accessed from outside the Avassa environment, typically by connecting to the control tower site, but connecting to an edge site is also an option assuming you have connectivity. The only difference when connecting from the outside is in how your application authenticates. There are many different options for authenticating from the outside, see the Obtaining a Token section of the Programming with Strongbox tutorial.
Using the query-topics API
The query-topics API is a powerful tool that can be used to obtain and filter logs from multiple edge sites in parallel. The request is always made to the Control Tower. A simple python function that searches for error messages in container logs across all sites could look like this:
import avassa_client.volga as volga
async def consume_query_topics(session):
query = {"topics": [{"re-match-topic-name": "^system:container-logs:",
"filter" : {"string": {"match" : ["ERROR"]}}}]}
async with volga.QueryTopicsConsumer(session, query) as qt:
await qt.more(10)
while msg := await qt.recv():
payload = msg['payload']
print(f'I have received a message: {payload}')
This query will search all topics with names beginning with system:container
and return all messages containing the string ERROR
. For a full description of
the different parameters that can be used, refer to the websocket API
reference. Once the
QueryTopicsConsumer has been created, it behaves very much like a regular
Consumer object.
The QueryTopicsConsumer constructor takes just two parameters. A session object
and a python dictionary representation of the query itself. There is no need to
include the op
field as this is added by the client.
The client also sets the field auto-more
to false
. This is of little
consequence as long as you are using the python client, as it will
automatically issue more
calls for you, but if you are implementing your own
client, it is strongly recommended that you also set auto-more
to false
.
The default value of auto-more
is currently true
for for backward
compatibility reasons, but this is subject to change in a future relase.
Note also that, unlike regular consumers, the QueryTopicsConsumer will shut
down (meaning recv
will return None
once) when there are no more messages
to deliver, unless the follow
parameter is set to true
in the query
parameters.