Persistence with Queues
10 Minute Read
This tutorial builds on the basic concepts introduced in the publish/subscribe tutorial, and will show you how to send and receive persistent messages from a PubSub+ queue in a point to point fashion.
Assumptions
This tutorial assumes the following:
- You are familiar with Solace PubSub+ core concepts.
- You have access to PubSub+ messaging with the following configuration details:
- Connectivity information for a PubSub+ message-VPN configured for guaranteed messaging support
- Enabled client username and password
- Client-profile enabled with guaranteed messaging permissions.
One simple way to get access to Solace messaging quickly is to create a messaging service in Solace PubSub+ Cloud as outlined here. You can find other ways to get access to Solace messaging below.
Goals
The goal of this tutorial is to understand the following:
- How to programmatically create a durable queue on the PubSub+
- How to send a persistent message to a Solace queue
- How to bind to this queue and receive a persistent message
Get Solace Messaging
This tutorial requires access Solace PubSub+ messaging and requires that you know several connectivity properties about your Solace messaging. Specifically you need to know the following:
Resources | Value | Description |
---|---|---|
Host | String | This is the address clients use when connecting to the PubSub+ messaging to send and receive messages. (Format: DNS_NAME:Port or IP:Port ) |
Message VPN | String | The PubSub+ message router Message VPN that this client should connect to. |
Client Username | String | The client username. (See Notes below) |
Client Password | String | The client password. (See Notes below) |
There are several ways you can get access to PubSub+ Messaging and find these required properties.
Option 1: Use PubSub+ Cloud
-
Follow these instructions to quickly spin up a cloud-based PubSub+ messaging service for your applications.
-
The messaging connectivity information is found in the service details in the connectivity tab (shown below). You will need:
- Host:Port (use the SMF URI)
- Message VPN
- Client Username
- Client Password
Option 2: Start a PubSub+ Software
-
Follow these instructions to start the PubSub+ Software in leading Clouds, Container Platforms or Hypervisors. The tutorials outline where to download and how to install the PubSub+ Software.
-
The messaging connectivity information are the following:
-
Host: <public_ip> (IP address assigned to the VMR in tutorial instructions)
-
Message VPN: default
-
Client Username: sampleUser (can be any value)
-
Client Password: samplePassword (can be any value)
Note: By default, the PubSub+ Software "default" message VPN has authentication disabled.
-
Option 3: Get access to a PubSub+ Appliance
-
Contact your PubSub+ appliance administrators and obtain the following:
- A PubSub+ Message-VPN where you can produce and consume direct and persistent messages
- The host name or IP address of the Solace appliance hosting your Message-VPN
- A username and password to access the Solace appliance
Obtaining the Solace PubSub+ API
The repository where this tutorial reside already comes with C API library version 7.7.1.4. However, you should always check for any newer version for download here. The C API is distributed as a gzipped tar file for all supported platform. To update to a newer version of the API, please ensure that the existing core library components are appropriately replaced by the newer components.
Connecting to the Solace PubSub+ message router
In order to send or receive messages, an application must connect a PubSub+ session. The PubSub+ session is the basis for all client communication with the PubSub+ message router.
In the Solace messaging API for C (SolClient), a few distinct steps are required to create and connect a Solace session.
- The API must be initialized
- Appropriate asynchronous callbacks must be declared
- A SolClient Context is needed to control application threading
- The SolClient session must be created
Initializing the CCSMP API
To initialize the SolClient API, you call the initialize method with arguments that control logging.
/* solClient needs to be initialized before any other API calls. */
solClient_initialize ( SOLCLIENT_LOG_DEFAULT_FILTER, NULL );
This call must be made prior to making any other calls to the SolClient API. It allows the API to initialize internal state and buffer pools.
SolClient Asynchronous Callbacks
The SolClient API is predominantly an asynchronous API designed for the highest speed and lowest latency. As such most events and notifications occur through callbacks. In order to get up and running, the following basic callbacks are required at a minimum.
static int msgCount = 0;
solClient_rxMsgCallback_returnCode_t
sessionMessageReceiveCallback ( solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p )
{
printf ( "Received message:\n" );
solClient_msg_dump ( msg_p, NULL, 0 );
printf ( "\n" );
msgCount++;
return SOLCLIENT_CALLBACK_OK;
}
void
sessionEventCallback ( solClient_opaqueSession_pt opaqueSession_p,
solClient_session_eventCallbackInfo_pt eventInfo_p, void *user_p )
{
printf("Session EventCallback() called: %s\n", solClient_session_eventToString ( eventInfo_p->sessionEvent));
}
The messageReceiveCallback is invoked for each message received by the Session. In this sample, the message is printed to the screen.
The eventCallback is invoked for various significant session events like connection, disconnection, and other SolClient session events. In this sample, simply prints the events. See the SolClient API documentation and samples for details on the session events.
Context Creation
As outlined in the core concepts, the context object is used to control threading that drives network I/O and message delivery and acts as containers for sessions. The easiest way to create a context is to use the context initializer with default thread creation.
/* Context */
solClient_opaqueContext_pt context_p;
solClient_context_createFuncInfo_t contextFuncInfo = SOLCLIENT_CONTEXT_CREATEFUNC_INITIALIZER;
solClient_context_create ( SOLCLIENT_CONTEXT_PROPS_DEFAULT_WITH_CREATE_THREAD,
&context_p, &contextFuncInfo, sizeof ( contextFuncInfo ) );
Session Creation
Finally a session is needed to actually connect to the PubSub+ message router. This is accomplished by creating a properties array and connecting the session.
/* Session */
solClient_opaqueSession_pt session_p;
solClient_session_createFuncInfo_t sessionFuncInfo = SOLCLIENT_SESSION_CREATEFUNC_INITIALIZER;
/* Session Properties */
const char *sessionProps[50] = {0, };
int propIndex = 0;
char *username,*password,*vpnname,*host;
/* Configure the Session function information. */
sessionFuncInfo.rxMsgInfo.callback_p = sessionMessageReceiveCallback;
sessionFuncInfo.rxMsgInfo.user_p = NULL;
sessionFuncInfo.eventInfo.callback_p = sessionEventCallback;
sessionFuncInfo.eventInfo.user_p = NULL;
/* Configure the Session properties. */
propIndex = 0;
host = argv[1];
vpnname = argv[2];
username = strsep(&vpnname,"@");
password = argv[3];
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_HOST;
sessionProps[propIndex++] = host;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_VPN_NAME;
sessionProps[propIndex++] = vpnname;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_USERNAME;
sessionProps[propIndex++] = username;
sessionProps[propIndex++] = SOLCLIENT_SESSION_PROP_PASSWORD;
sessionProps[propIndex++] = password;
sessionProps[propIndex++] = NULL;
/* Create the Session. */
solClient_session_create ( ( char ** ) sessionProps,
context_p,
&session_p, &sessionFuncInfo, sizeof ( sessionFuncInfo ) );
/* Connect the Session. */
solClient_session_connect ( session_p );
printf ( "Connected.\n" );
When creating the session, the factory method takes the session properties, the session pointer and information about the session callback functions. The API then creates the session within the supplied context and returns a reference in the session pointer. The final call to solClient_session_connect establishes the connection to the PubSub+ message router which makes the session ready for use.
At this point your client is connected to the PubSub+ message router. You can use PubSub+ Manager to view the client connection and related details.
Provisioning a Queue through the API
The first requirement for guaranteed messaging using a Solace message router is to provision a guaranteed message endpoint. For this tutorial we will use a point-to-point queue. To learn more about different guaranteed message endpoints see here.
Durable endpoints are not auto created on Solace message routers. However there are two ways to provision them.
- Using the management interface
- Using the APIs
Using the Solace PubSub+ APIs to provision an endpoint can be a convenient way of getting started quickly without needing to become familiar with the management interface. This is why it is used in this tutorial. However it should be noted that the management interface provides more options to control the queue properties. So generally it becomes the preferred method over time.
Provisioning an endpoint through the API requires the “Guaranteed Endpoint Create” permission in the client-profile. You can confirm this is enabled by looking at the client profile in PubSub+ Manager. If it is correctly set you will see the following:
Provisioning the queue involves three steps.
-
Check if end point provisioning is supported.
/* Check if the endpoint provisioning is support */ if ( !solClient_session_isCapable ( session_p, SOLCLIENT_SESSION_CAPABILITY_ENDPOINT_MANAGEMENT ) ) { printf ( "Endpoint management not supported on this appliance.\n" ); return -1; }
-
Set the properties for your queue. This example sets the permission and the quota of the endpoint. More detail on the possible settings can be found in developer documentation.
/* Provision Properties */ const char *provProps[20] = {0, }; int provIndex; /* Configure the Provision properties */ provIndex = 0; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PROP_ID; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PROP_QUEUE; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PROP_NAME; provProps[provIndex++] = argv[5]; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PROP_PERMISSION; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PERM_DELETE; provProps[provIndex++] = SOLCLIENT_ENDPOINT_PROP_QUOTA_MB; provProps[provIndex++] = "100";
-
Proceed to provision the queue.
/* Queue Network Name to be used with "solClient_session_endpointProvision()" */
char qNN[80];
/* Try to provision the Queue. Ignore if already exists */
solClient_session_endpointProvision ( ( char ** ) provProps,
session_p,
SOLCLIENT_PROVISION_FLAGS_WAITFORCONFIRM|
SOLCLIENT_PROVISION_FLAGS_IGNORE_EXIST_ERRORS,
NULL, qNN, sizeof ( qNN ) );
Sending a message to a queue
Now it is time to send a message to the queue.
To send a message, you must create a message and a queue destination. This tutorial will send a PubSub+ binary message with contents "Hello world!". Then you must send the message to the PubSub+ message router.
/* Message */
solClient_opaqueMsg_pt msg_p = NULL;
solClient_destination_t destination;
const char *text_p = "Hello world!";
/* Allocate a message. */
solClient_msg_alloc ( &msg_p );
/* Set the delivery mode for the message. */
solClient_msg_setDeliveryMode ( msg_p, SOLCLIENT_DELIVERY_MODE_PERSISTENT );
/* Set the destination. */
destination.destType = SOLCLIENT_QUEUE_DESTINATION;
destination.dest = argv[5];
solClient_msg_setDestination ( msg_p, &destination, sizeof ( destination ) );
/* Add some content to the message. */
solClient_msg_setBinaryAttachment ( msg_p, text_p, ( solClient_uint32_t ) strlen ( (char *)text_p ) );
/* Send the message. */
printf ( "About to send message '%s' to queue '%s'...\n", (char *)text_p, argv[5] );
solClient_session_sendMsg ( session_p, msg_p );
printf ( "Message sent.\n" );
/* Free the message. */
solClient_msg_free ( &msg_p );
The message is transferred to the PubSub+ message router asynchronously, but if all goes well, it will be waiting for your consumer on the queue.
Receiving a message from a queue
Now it is time to receive the messages sent to your queue.
You still need to connect a session just as you did with the publisher. With a connected session, you then need to bind to the PubSub+ message router queue with a flow receiver. Flow receivers allow applications to receive messages from a PubSub+ guaranteed message flow. Flows encapsulate all of the acknowledgement behaviour required for guaranteed messaging. Conveniently flow receivers have the same interface as message consumers but flows also require some additional properties on creation.
A flow requires properties. At its most basic, the flow properties require the endpoint (our newly provisioned or existing queue) and an ack mode. In this example you’ll use the client ack mode where the application will explicitly acknowledge each message.
Flows are created from PubSub+ session objects just as direct message consumers are.
Notice flowMessageReceiveCallback
and flowEventCallback
callbacks that are passed in when creating a flow. These callbacks will be invoked when a message arrives to the endpoint (the queue) or a flow events occurs.
You must start your flow so it can begin receiving messages.
void
sessionEventCallback ( solClient_opaqueSession_pt opaqueSession_p,
solClient_session_eventCallbackInfo_pt eventInfo_p, void *user_p )
{
}
static void
flowEventCallback ( solClient_opaqueFlow_pt opaqueFlow_p, solClient_flow_eventCallbackInfo_pt eventInfo_p, void *user_p )
{
}
/*************************************************************************
* Create a Flow
*************************************************************************/
/* Flow Properties */
const char *flowProps[20] = (0, };
/* Flow */
solClient_opaqueFlow_pt flow_p;
solClient_flow_createFuncInfo_t flowFuncInfo = SOLCLIENT_FLOW_CREATEFUNC_INITIALIZER;
/* Configure the Flow function information */
flowFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback;
flowFuncInfo.eventInfo.callback_p = flowEventCallback;
/* Configure the Flow properties */
propIndex = 0;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_BIND_BLOCKING;
flowProps[propIndex++] = SOLCLIENT_PROP_DISABLE_VAL;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_BIND_ENTITY_ID;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_BIND_ENTITY_QUEUE;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_ACKMODE_CLIENT;
flowProps[propIndex++] = SOLCLIENT_FLOW_PROP_BIND_NAME;
flowProps[propIndex++] = argv[5];
solClient_session_createFlow ( ( char ** ) flowProps,
session_p,
&flow_p, &flowFuncInfo, sizeof ( flowFuncInfo ) );
Both flow properties and endpoint properties are explained in more detail in the developer documentation.
Same as direct messaging, all messages will be received thru the invocation of sessionEventCallback
.
Summarizing
Combining the example source code shown above results in the following source code files:
Building
Building these examples is simple.
For linux and mac
All you need is to execute the compile script in the build
folder.
linux
build$ ./build_intro_linux_xxx.sh
mac
build$ ./build_intro_mac_xxx.sh
For windows
You can either build the examples from DOS prompt or from Visual Studio IDE.
To build from DOS prompt, you must first launch the appropriate Visual Studio Command Prompt and then run the batch file
c:\solace-sample-c\build>build_intro_win_xxx.bat
Referencing the downloaded SolClient library include and lib file is required. For more advanced build control, consider adapting the makefile found in the "intro" directory of the SolClient package. The above samples very closely mirror the samples found there.
Running the Samples
If you start the QueuePublisher
with the required arguments of your PubSub+ messaging, if will publish the message to the specified queue. In the example below, a message is published to a queue q1
bin$ . /setenv.sh
bin$ ./QueuePublisher <msg_backbone_ip:port> <message-vpn> <client-username> <password> <queue>
QueuePublisher initializing...
Connected.
About to send message 'Hello world!' to queue 'q1'...
Message sent.
Acknowledgement received!
Exiting.
You can next start the QueueSubscriber
with the same queue q1
and if the message has been successfully delivered and queued, the queued message will be consumed and printed out.
bin$ ./QueueSubscriber <msg_backbone_ip:port> <message-vpn> <client-username> <password> <queue>
Connected.
Waiting for messages......
Received message:
Destination: Queue 'q1'
Class Of Service: COS_1
DeliveryMode: PERSISTENT
Message Id: 1
Binary Attachment: len=12
48 65 6c 6c 6f 20 77 6f 72 6c 64 21 Hello wo rld!
Acknowledging message Id: 1.
Exiting.
You can also run QueuePublisher
a few more times to let the queue to build up a little before running QueueSubscriber
. The QueueSubscriber
will consume all queued messages and display them.
You have now successfully connected a client, sent persistent messages to a queue and received and acknowledged them.