Creating a subscription and receiving data
This example demonstrates the complete workflow for creating a subscription and receiving data through the Local Actor API v2. The workflow follows the pattern: CREATE → POLL → CONNECT → USE, where you first create a subscription via the REST API, poll for its status until it's ready, then connect to the provided AMQP endpoint to receive messages.
Prerequisites
General Requirements
- Valid client certificate and private key in PEM format
- CA certificate in PEM format
- Access to a Local Actor API v2 instance
Language-Specific Requirements
- Python 3.x
- python-qpid-proton library
- requests library
- .NET 6.0 or later
- AMQPNetLite NuGet package
- Go 1.18 or later
- github.com/Azure/go-amqp package
- Java 11 or later
- Maven 3.6 or later
- Apache Qpid Proton-J library
- OkHttp library
- Jackson library
- Bouncy Castle library
- Node.js 16 or later
- rhea library
Environment Variables
Variable | Description | Example |
---|---|---|
ACTOR_API_HOST |
Hostname of the Actor API instance | api.example.com |
ACTOR_API_PORT |
Port of the Actor API instance | 443 |
ACTOR_API_SUBSCRIPTION_SELECTOR |
Selector for the subscription to create | messageType = 'TEST' |
ACTOR_COMMON_NAME |
Common name from the actor client certificate | actor.example.com |
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM |
Path to client certificate and private key in PEM format | /path/to/client-cert-and-key.pem |
CA_CERTIFICATE_PEM |
Path to CA certificate in PEM format | /path/to/ca-cert.pem |
Configuration
ACTOR_API_HOST=os.environ.get("ACTOR_API_HOST", "hostname_of_the_actor_api")
ACTOR_API_PORT=os.environ.get("ACTOR_API_PORT", "port_of_the_actor_api")
ACTOR_API_SUBSCRIPTION_SELECTOR=os.environ.get("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription")
ACTOR_COMMON_NAME=os.environ.get("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate")
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM=os.environ.get("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM=os.environ.get("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
private static readonly string ACTOR_API_HOST = Environment.GetEnvironmentVariable("ACTOR_API_HOST") ?? "hostname_of_the_actor_api";
private static readonly string ACTOR_API_PORT = Environment.GetEnvironmentVariable("ACTOR_API_PORT") ?? "port_of_the_actor_api";
private static readonly string ACTOR_API_SUBSCRIPTION_SELECTOR = Environment.GetEnvironmentVariable("ACTOR_API_SUBSCRIPTION_SELECTOR") ?? "selector_of_the_subscription";
private static readonly string ACTOR_COMMON_NAME = Environment.GetEnvironmentVariable("ACTOR_COMMON_NAME") ?? "cn_of_the_actor_client_certificate";
private static readonly string ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = Environment.GetEnvironmentVariable("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM") ?? "pem_with_x509_certificate_chain_and_private_key";
private static readonly string CA_CERTIFICATE_PEM = Environment.GetEnvironmentVariable("CA_CERTIFICATE_PEM") ?? "pem_with_x509_certificate";
var (
ACTOR_API_HOST = getEnv("ACTOR_API_HOST", "hostname_of_the_actor_api")
ACTOR_API_PORT = getEnv("ACTOR_API_PORT", "port_of_the_actor_api")
ACTOR_API_SUBSCRIPTION_SELECTOR = getEnv("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription")
ACTOR_COMMON_NAME = getEnv("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate")
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
)
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
private static final String ACTOR_API_HOST = getEnv("ACTOR_API_HOST", "hostname_of_the_actor_api");
private static final String ACTOR_API_PORT = getEnv("ACTOR_API_PORT", "port_of_the_actor_api");
private static final String ACTOR_API_SUBSCRIPTION_SELECTOR = getEnv("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription");
private static final String ACTOR_COMMON_NAME = getEnv("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate");
private static final String ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key");
private static final String CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate");
private static final ObjectMapper objectMapper = new ObjectMapper();
private static OkHttpClient httpClient;
private static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
return value != null ? value : defaultValue;
}
const ACTOR_API_HOST = process.env.ACTOR_API_HOST || 'hostname_of_the_actor_api';
const ACTOR_API_PORT = process.env.ACTOR_API_PORT || 'port_of_the_actor_api';
const ACTOR_API_SUBSCRIPTION_SELECTOR = process.env.ACTOR_API_SUBSCRIPTION_SELECTOR || 'selector_of_the_subscription';
const ACTOR_COMMON_NAME = process.env.ACTOR_COMMON_NAME || 'cn_of_the_actor_client_certificate';
const ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = process.env.ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM || 'pem_with_x509_certificate_chain_and_private_key';
const CA_CERTIFICATE_PEM = process.env.CA_CERTIFICATE_PEM || 'pem_with_x509_certificate';
Explanation
Configuration by environment variables
API Functions
def api_url(endpoint):
return "https://%s:%s/%s/%s" % (ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint)
def api_get(endpoint):
return requests.get(api_url(endpoint), verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
def api_post(endpoint, json_data):
return requests.post(api_url(endpoint), None, json_data, verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
def api_delete(endpoint):
return requests.delete(api_url(endpoint), verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
def api_get_subscription(id):
return api_get("subscriptions/%s" % id)
def api_delete_subscription(id):
return api_delete("subscriptions/%s" % id)
def api_create_subscription():
json_data = {
"selector": ACTOR_API_SUBSCRIPTION_SELECTOR
}
return api_post("subscriptions", json_data)
private static HttpClient CreateHttpClient()
{
var handler = new HttpClientHandler();
try
{
// Load client certificate using the same method as AMQP
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
handler.ClientCertificates.Add(clientCert);
// Load CA certificate for server validation
var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
var caCert = X509Certificate2.CreateFromPem(caCertPem);
handler.ServerCertificateCustomValidationCallback = (sender, cert, chain, errors) => {
return chain.ChainElements[chain.ChainElements.Count - 1].Certificate.Equals(caCert);
};
}
catch (Exception ex)
{
LogError($"Error loading certificates for HTTP client: {ex.Message}");
throw;
}
return new HttpClient(handler);
}
private static string ApiUrl(string endpoint)
{
return $"https://{ACTOR_API_HOST}:{ACTOR_API_PORT}/{ACTOR_COMMON_NAME}/{endpoint}";
}
private static async Task<HttpResponseMessage> ApiGetAsync(string endpoint, HttpClient client)
{
return await client.GetAsync(ApiUrl(endpoint));
}
private static async Task<HttpResponseMessage> ApiPostAsync(string endpoint, object jsonData, HttpClient client)
{
var json = JsonSerializer.Serialize(jsonData);
var content = new StringContent(json, Encoding.UTF8, "application/json");
return await client.PostAsync(ApiUrl(endpoint), content);
}
private static async Task<HttpResponseMessage> ApiDeleteAsync(string endpoint, HttpClient client)
{
return await client.DeleteAsync(ApiUrl(endpoint));
}
private static async Task<HttpResponseMessage> ApiGetSubscriptionAsync(string id, HttpClient client)
{
return await ApiGetAsync($"subscriptions/{id}", client);
}
private static async Task<HttpResponseMessage> ApiDeleteSubscriptionAsync(string id, HttpClient client)
{
return await ApiDeleteAsync($"subscriptions/{id}", client);
}
private static async Task<HttpResponseMessage> ApiCreateSubscriptionAsync(HttpClient client)
{
var jsonData = new { selector = ACTOR_API_SUBSCRIPTION_SELECTOR };
return await ApiPostAsync("subscriptions", jsonData, client);
}
func apiURL(endpoint string) string {
return fmt.Sprintf("https://%s:%s/%s/%s", ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint)
}
func createHTTPClient() *http.Client {
cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
if err != nil {
log.Fatalf("Failed to load client certificate: %v", err)
}
caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
if err != nil {
log.Fatalf("Failed to read CA certificate: %v", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS13,
}
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
}
func apiGet(endpoint string) (*http.Response, error) {
client := createHTTPClient()
return client.Get(apiURL(endpoint))
}
func apiPost(endpoint string, jsonData map[string]interface{}) (*http.Response, error) {
client := createHTTPClient()
data, err := json.Marshal(jsonData)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", apiURL(endpoint), strings.NewReader(string(data)))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
return client.Do(req)
}
func apiDelete(endpoint string) (*http.Response, error) {
client := createHTTPClient()
req, err := http.NewRequest("DELETE", apiURL(endpoint), nil)
if err != nil {
return nil, err
}
return client.Do(req)
}
func apiGetSubscription(id string) (*http.Response, error) {
return apiGet(fmt.Sprintf("subscriptions/%s", id))
}
func apiDeleteSubscription(id string) (*http.Response, error) {
return apiDelete(fmt.Sprintf("subscriptions/%s", id))
}
func apiCreateSubscription() (*http.Response, error) {
jsonData := map[string]interface{}{
"selector": ACTOR_API_SUBSCRIPTION_SELECTOR,
}
return apiPost("subscriptions", jsonData)
}
private static String apiUrl(String endpoint) {
return String.format("https://%s:%s/%s/%s", ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint);
}
private static Response apiGet(String endpoint) throws IOException {
Request request = new Request.Builder()
.url(apiUrl(endpoint))
.build();
return httpClient.newCall(request).execute();
}
private static Response apiPost(String endpoint, String jsonData) throws IOException {
RequestBody body = RequestBody.create(jsonData, MediaType.parse("application/json"));
Request request = new Request.Builder()
.url(apiUrl(endpoint))
.post(body)
.build();
return httpClient.newCall(request).execute();
}
private static Response apiDelete(String endpoint) throws IOException {
Request request = new Request.Builder()
.url(apiUrl(endpoint))
.delete()
.build();
return httpClient.newCall(request).execute();
}
private static Response apiGetSubscription(String id) throws IOException {
return apiGet("subscriptions/" + id);
}
private static Response apiDeleteSubscription(String id) throws IOException {
return apiDelete("subscriptions/" + id);
}
private static Response apiCreateSubscription() throws IOException {
Map<String, Object> jsonData = new HashMap<>();
jsonData.put("selector", ACTOR_API_SUBSCRIPTION_SELECTOR);
String json = objectMapper.writeValueAsString(jsonData);
return apiPost("subscriptions", json);
}
function apiUrl(endpoint) {
return `https://${ACTOR_API_HOST}:${ACTOR_API_PORT}/${ACTOR_COMMON_NAME}/${endpoint}`;
}
function createHttpsAgent() {
return new https.Agent({
cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
rejectUnauthorized: true
});
}
function apiRequest(method, endpoint, data = null) {
return new Promise((resolve, reject) => {
const url = new URL(apiUrl(endpoint));
const agent = createHttpsAgent();
const options = {
hostname: url.hostname,
port: url.port,
path: url.pathname,
method: method,
agent: agent,
headers: {
'Content-Type': 'application/json'
}
};
const req = https.request(options, (res) => {
let responseData = '';
res.on('data', (chunk) => {
responseData += chunk;
});
res.on('end', () => {
const response = {
statusCode: res.statusCode,
ok: res.statusCode >= 200 && res.statusCode < 300,
data: responseData,
json: () => JSON.parse(responseData)
};
resolve(response);
});
});
req.on('error', (error) => {
reject(error);
});
if (data) {
req.write(JSON.stringify(data));
}
req.end();
});
}
function apiGet(endpoint) {
return apiRequest('GET', endpoint);
}
function apiPost(endpoint, jsonData) {
return apiRequest('POST', endpoint, jsonData);
}
function apiDelete(endpoint) {
return apiRequest('DELETE', endpoint);
}
function apiGetSubscription(id) {
return apiGet(`subscriptions/${id}`);
}
function apiDeleteSubscription(id) {
return apiDelete(`subscriptions/${id}`);
}
function apiCreateSubscription() {
const jsonData = {
selector: ACTOR_API_SUBSCRIPTION_SELECTOR
};
return apiPost('subscriptions', jsonData);
}
Explanation
Implementation of the required subscription endpoints (see REST API Reference)
AMQP 1.0 Client
def amqp_create_ssl_config():
ssl_config = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_config.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
ssl_config.set_credentials(cert_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, key_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, password=None)
ssl_config.set_trusted_ca_db(CA_CERTIFICATE_PEM)
return ssl_config
class Receiver(MessagingHandler):
def __init__(self, endpoint):
super(Receiver, self).__init__()
self.__endpoint = endpoint
def on_start(self, event):
logging.debug("Container reactor started")
container = event.container
endpoint = self.__endpoint
# Step 1: connect
ssl_config = amqp_create_ssl_config()
amqp_url = "amqps://%s:%s" % (endpoint["host"], endpoint["port"])
connection = container.connect(amqp_url, ssl_domain = ssl_config, reconnect = False, heartbeat = 5)
# Step 2: create a receiving link using the source address of the endpoint
container.create_receiver(connection, endpoint["source"])
def on_message(self, event):
# Decode binary body as UTF-8
body_binary = event.message.body
if isinstance(body_binary, bytes):
body_text = body_binary.decode('utf-8')
elif hasattr(body_binary, 'tobytes'):
# Handle memory view objects
body_text = body_binary.tobytes().decode('utf-8')
else:
body_text = str(body_binary)
# Format application properties in sorted order
app_props = {}
if hasattr(event.message, 'properties') and event.message.properties:
app_props = event.message.properties
elif hasattr(event.message, 'application_properties') and event.message.application_properties:
app_props = event.message.application_properties
sorted_props_json = json.dumps(app_props, sort_keys=True)
logging.info("Message received: body='%s', properties=%s", body_text, sorted_props_json)
def amqp_connect_and_listen(endpoint):
receiver = Receiver(endpoint)
container = Container(receiver)
thread = threading.Thread(name = "AMQPClient", target = container.run, daemon = True)
thread.start()
while thread.is_alive():
time.sleep(1)
private static ConnectionFactory CreateConnectionFactory()
{
var factory = new ConnectionFactory();
try
{
// Configure SSL/TLS with client certificate for SASL EXTERNAL
// Read the combined PEM file content
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
factory.SSL.ClientCertificates.Add(clientCert);
// Enable SSL/TLS
factory.SSL.Protocols = System.Security.Authentication.SslProtocols.Tls13;
// Load CA certificate for validation
var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
var caCert = X509Certificate2.CreateFromPem(caCertPem);
factory.SSL.RemoteCertificateValidationCallback = (sender, cert, chain, errors) => {
// Validate against CA certificate
return ValidateCertificate(cert as X509Certificate2, caCert);
};
// Extract common name for SASL EXTERNAL
var commonName = ExtractCommonName(clientCert.Subject);
LogDebug($"Certificate Subject: {clientCert.Subject}");
LogDebug($"Extracted Common Name: {commonName}");
// Configure custom SASL EXTERNAL profile that sends only the CN value
factory.SASL.Profile = new CustomSaslExternalProfile(commonName);
}
catch (Exception ex)
{
LogError($"Error loading certificates: {ex.Message}");
throw;
}
return factory;
}
private static bool ValidateCertificate(X509Certificate2? serverCert, X509Certificate2 caCert)
{
if (serverCert == null || caCert == null)
return false;
var chain = new X509Chain();
chain.ChainPolicy.ExtraStore.Add(caCert);
chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
return chain.Build(serverCert);
}
private static string ExtractCommonName(string subjectDn)
{
// Extract CN value from Distinguished Name (e.g., "CN=XX99999, O=Company" -> "XX99999")
var parts = subjectDn.Split(',');
foreach (var part in parts)
{
var trimmed = part.Trim();
if (trimmed.StartsWith("CN="))
{
return trimmed.Substring(3);
}
}
return subjectDn; // Fallback to full DN if CN not found
}
// Custom SASL EXTERNAL profile that sends only the CN value
public class CustomSaslExternalProfile : SaslProfile
{
private readonly string identity;
public CustomSaslExternalProfile(string identity) : base(new Symbol("EXTERNAL"))
{
this.identity = identity;
}
protected override DescribedList GetStartCommand(string hostname)
{
return new SaslInit()
{
Mechanism = "EXTERNAL",
InitialResponse = Encoding.UTF8.GetBytes(identity)
};
}
protected override DescribedList OnCommand(DescribedList command)
{
// For EXTERNAL, we should only need the initial response
return null!;
}
protected override ITransport UpgradeTransport(ITransport transport)
{
// No transport upgrade needed for EXTERNAL
return transport;
}
}
private static void PrintMessageDetails(Message message)
{
// Decode binary body as UTF-8
var bodyText = "";
if (message.Body is byte[] bodyBytes)
{
bodyText = Encoding.UTF8.GetString(bodyBytes);
}
else
{
bodyText = message.Body?.ToString() ?? "";
}
// Format application properties as JSON in sorted order
var appPropsJson = "{}";
if (message.ApplicationProperties != null && message.ApplicationProperties.Map != null)
{
var propsDict = new Dictionary<string, object>();
foreach (var kvp in message.ApplicationProperties.Map)
{
propsDict[kvp.Key.ToString()] = kvp.Value;
}
// Sort keys for consistent output
var sortedProps = propsDict.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value);
appPropsJson = JsonSerializer.Serialize(sortedProps, new JsonSerializerOptions { WriteIndented = false });
}
LogInfo($"Message received: body='{bodyText}', properties={appPropsJson}");
}
private static async Task AmqpConnectAndListenAsync(SubscriptionEndpoint endpoint)
{
var factory = CreateConnectionFactory();
// Extract CN from certificate for user identity
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
var commonName = ExtractCommonName(clientCert.Subject);
// Create connection with proper user identity
var address = new Address($"amqps://{endpoint.Host}:{endpoint.Port}");
var connection = await factory.CreateAsync(address);
var session = new Session(connection);
var receiver = new ReceiverLink(session, "receiver-link", endpoint.Source);
LogInfo("Listening for messages. Press Ctrl+C to stop.");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (!cts.Token.IsCancellationRequested)
{
var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(1));
if (message != null)
{
PrintMessageDetails(message);
receiver.Accept(message);
}
}
}
catch (OperationCanceledException)
{
LogInfo("Stopping message listener...");
}
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
func amqpCreateTLSConfig() *tls.Config {
cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
if err != nil {
log.Fatalf("Failed to load client certificate: %v", err)
}
caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
if err != nil {
log.Fatalf("Failed to read CA certificate: %v", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS13,
}
}
func amqpConnectAndListen(endpoint map[string]interface{}) {
ctx := context.Background()
host := endpoint["host"].(string)
port := int(endpoint["port"].(float64))
source := endpoint["source"].(string)
amqpURL := fmt.Sprintf("amqps://%s:%d", host, port)
conn, err := amqp.Dial(ctx, amqpURL, &amqp.ConnOptions{
TLSConfig: amqpCreateTLSConfig(),
SASLType: amqp.SASLTypeExternal(""),
})
if err != nil {
log.Fatalf("Failed to connect to AMQP server: %v", err)
}
defer conn.Close()
session, err := conn.NewSession(ctx, nil)
if err != nil {
log.Fatalf("Failed to create AMQP session: %v", err)
}
defer session.Close(ctx)
receiver, err := session.NewReceiver(ctx, source, nil)
if err != nil {
log.Fatalf("Failed to create AMQP receiver: %v", err)
}
defer receiver.Close(ctx)
log.Printf("Starting to receive messages from %s. Press Ctrl+C to stop.", source)
for {
msg, err := receiver.Receive(ctx, nil)
if err != nil {
log.Printf("Error receiving message: %v", err)
break
}
// Decode binary body as UTF-8
var bodyText string
if len(msg.Data) > 0 {
bodyBytes := msg.Data[0]
bodyText = string(bodyBytes)
} else {
bodyText = ""
}
// Format application properties in sorted order
appPropsJSON := "{}"
if msg.ApplicationProperties != nil {
appPropsBytes, err := json.Marshal(msg.ApplicationProperties)
if err == nil {
appPropsJSON = string(appPropsBytes)
}
}
log.Printf("Message received: body='%s', properties=%s", bodyText, appPropsJSON)
err = receiver.AcceptMessage(ctx, msg)
if err != nil {
log.Printf("Error accepting message: %v", err)
}
}
}
private static class ReceiverHandler extends BaseHandler {
private final Map<String, Object> endpoint;
private SSLContext sslContext;
public ReceiverHandler(Map<String, Object> endpoint) {
this.endpoint = endpoint;
}
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
@Override
public void onConnectionInit(Event event) {
logger.fine("Connection initialized");
Connection connection = event.getConnection();
connection.setHostname((String) endpoint.get("host"));
connection.setContainer("java-subscription-example");
connection.open();
}
@Override
public void onConnectionBound(Event event) {
logger.fine("Connection bound, configuring transport");
Transport transport = event.getTransport();
if (sslContext != null) {
// Configure SASL EXTERNAL
Sasl sasl = transport.sasl();
sasl.setMechanisms("EXTERNAL");
// Configure SSL
SslDomain sslDomain = Proton.sslDomain();
sslDomain.init(SslDomain.Mode.CLIENT);
sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
sslDomain.setSslContext(sslContext);
transport.ssl(sslDomain);
}
}
@Override
public void onConnectionRemoteOpen(Event event) {
logger.fine("Connection opened");
Connection connection = event.getConnection();
Session session = connection.session();
session.open();
}
@Override
public void onSessionRemoteOpen(Event event) {
logger.fine("Session opened");
Session session = event.getSession();
String sourceAddress = (String) endpoint.get("source");
Receiver receiver = session.receiver(sourceAddress);
Source source = new Source();
source.setAddress(sourceAddress);
receiver.setSource(source);
Target target = new Target();
receiver.setTarget(target);
receiver.open();
}
@Override
public void onLinkRemoteClose(Event event) {
Link link = event.getLink();
logger.severe("Link remote close - State: " + link.getRemoteState());
if (link.getRemoteCondition() != null) {
logger.severe("Condition: " + link.getRemoteCondition().getCondition());
logger.severe("Description: " + link.getRemoteCondition().getDescription());
}
}
@Override
public void onLinkRemoteOpen(Event event) {
logger.fine("Receiver link opened, ready to receive messages");
if (event.getLink() instanceof Receiver) {
Receiver receiver = (Receiver) event.getLink();
receiver.flow(10); // Initial credit
}
}
@Override
public void onDelivery(Event event) {
Delivery delivery = event.getDelivery();
if (delivery.isReadable() && !delivery.isPartial()) {
Receiver receiver = (Receiver) delivery.getLink();
// Read the message
int size = delivery.pending();
byte[] buffer = new byte[size];
int read = receiver.recv(buffer, 0, buffer.length);
receiver.advance();
// Decode the message
Message message = Proton.message();
message.decode(buffer, 0, read);
// Extract body
String bodyText = "";
if (message.getBody() != null) {
Object body = message.getBody();
if (body instanceof org.apache.qpid.proton.amqp.messaging.Data) {
org.apache.qpid.proton.amqp.messaging.Data data = (org.apache.qpid.proton.amqp.messaging.Data) body;
if (data.getValue() != null) {
bodyText = new String(data.getValue().getArray(), StandardCharsets.UTF_8);
}
} else {
bodyText = body.toString();
}
}
// Extract and format application properties
Map<String, Object> appProps = new HashMap<>();
if (message.getApplicationProperties() != null && message.getApplicationProperties().getValue() != null) {
appProps = message.getApplicationProperties().getValue();
}
try {
// Format properties in sorted order
String sortedPropsJson = objectMapper.writeValueAsString(new TreeMap<>(appProps));
logger.info(String.format("Message received: body='%s', properties=%s", bodyText, sortedPropsJson));
} catch (Exception e) {
logger.warning("Error formatting properties: " + e.getMessage());
}
// Accept the message
delivery.disposition(Accepted.getInstance());
delivery.settle();
// Flow more credit if needed
if (receiver.getCredit() < 5) {
receiver.flow(10);
}
}
}
@Override
public void onTransportError(Event event) {
logger.log(Level.SEVERE, "Transport error: " + event.getTransport().getCondition());
}
}
private static SSLContext createSSLContext() throws Exception {
// Add BouncyCastle provider
Security.addProvider(new BouncyCastleProvider());
// Create SSL context
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
// Load client certificate and key
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null);
// Parse certificates and key using BouncyCastle
List<java.security.cert.Certificate> certChain = new ArrayList<>();
PrivateKey privateKey = null;
try (PEMParser pemParser = new PEMParser(new FileReader(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM))) {
Object object;
JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter().setProvider("BC");
while ((object = pemParser.readObject()) != null) {
if (object instanceof X509CertificateHolder) {
certChain.add(certConverter.getCertificate((X509CertificateHolder) object));
} else if (object instanceof PEMKeyPair) {
privateKey = keyConverter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo());
}
}
}
// Add certificate chain and key to keystore
if (privateKey != null && !certChain.isEmpty()) {
keyStore.setKeyEntry("client", privateKey, new char[0], certChain.toArray(new java.security.cert.Certificate[0]));
} else {
throw new IllegalStateException("Failed to load client certificate and key");
}
// Initialize key manager
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, new char[0]);
// Load CA certificate
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
try (FileInputStream fis = new FileInputStream(CA_CERTIFICATE_PEM)) {
java.security.cert.Certificate caCert = cf.generateCertificate(fis);
trustStore.setCertificateEntry("ca", caCert);
}
// Initialize trust manager
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
// Initialize SSL context
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
return sslContext;
}
private static void amqpConnectAndListen(Map<String, Object> endpoint) throws Exception {
// Configure SSL
SSLContext sslContext = createSSLContext();
// Create handler
ReceiverHandler handler = new ReceiverHandler(endpoint);
handler.setSslContext(sslContext);
// Create reactor
Reactor reactor = Proton.reactor(handler);
// Connect to host with SSL and SASL configuration
String host = (String) endpoint.get("host");
int port = ((Number) endpoint.get("port")).intValue();
// Use reactor's connection method with proper SSL/SASL setup
reactor.connectionToHost(host, port, handler);
// Run reactor
reactor.run();
}
function amqpConnectAndListen(endpoint) {
const container = rhea.create_container();
// Connection options with TLS configuration
const connectionOptions = {
host: endpoint.host,
port: parseInt(endpoint.port),
transport: 'tls',
// Client certificate authentication
key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
// CA certificate
ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
// Connection settings
reconnect: false,
initial_reconnect_delay: 0,
max_reconnect_delay: 0,
idle_time_out: 5000, // 5 second heartbeat
container_id: 'javascript-subscription-example',
// Enable SASL EXTERNAL for client certificate authentication
enable_sasl_external: true
};
let receiver = null;
container.on('connection_open', function (context) {
console.log(`${new Date().toISOString()} DEBUG Container reactor started`);
console.log(`${new Date().toISOString()} DEBUG Creating receiver for source: ${endpoint.source}`);
// Create receiver
receiver = context.connection.open_receiver({
source: {
address: endpoint.source
}
});
});
container.on('receiver_open', function (context) {
console.log(`${new Date().toISOString()} DEBUG Receiver link opened, ready to receive messages`);
});
container.on('message', function (context) {
// Decode binary body as UTF-8
let bodyText = '';
if (context.message.body !== undefined) {
if (Buffer.isBuffer(context.message.body)) {
// Direct buffer case
bodyText = context.message.body.toString('utf8');
} else if (Array.isArray(context.message.body)) {
// Array of data sections - concatenate all buffers
for (const section of context.message.body) {
if (Buffer.isBuffer(section)) {
bodyText += section.toString('utf8');
}
}
} else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Section') {
// AMQP Section object with content buffer
if (Buffer.isBuffer(context.message.body.content)) {
bodyText = context.message.body.content.toString('utf8');
} else {
bodyText = String(context.message.body.content);
}
} else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Buffer') {
// Sometimes Buffer objects appear as generic objects
bodyText = Buffer.from(context.message.body).toString('utf8');
} else {
// Fallback to string conversion
bodyText = String(context.message.body);
}
}
// Format application properties in sorted order
let appProps = {};
if (context.message.application_properties) {
appProps = context.message.application_properties;
}
const sortedPropsJson = JSON.stringify(appProps, Object.keys(appProps).sort());
console.log(`${new Date().toISOString()} INFO Message received: body='${bodyText}', properties=${sortedPropsJson}`);
// Accept the message
context.delivery.accept();
});
container.on('connection_close', function (context) {
console.log(`${new Date().toISOString()} INFO Connection closed`);
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Connection close error:`, context.connection.error);
}
});
container.on('connection_error', function (context) {
const error = context.connection.error || context.error;
console.log(`${new Date().toISOString()} SEVERE Connection error:`, error);
});
container.on('error', function (context) {
console.log(`${new Date().toISOString()} SEVERE Error:`, context.error);
});
container.on('disconnected', function (context) {
console.log(`${new Date().toISOString()} SEVERE Disconnected:`, context.error || 'Unknown error');
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Disconnection details:`, context.connection.error);
}
});
// Connect
console.log(`${new Date().toISOString()} INFO Connecting to amqps://${endpoint.host}:${endpoint.port}`);
container.connect(connectionOptions);
// Return a promise that runs indefinitely (like Python while thread.is_alive())
return new Promise(() => {
// This promise never resolves, keeping the connection alive
});
}
Create a subscription
subscription_create_response = api_create_subscription()
subscription_create_response_json = subscription_create_response.json();
var subscriptionCreateResponse = await ApiCreateSubscriptionAsync(client);
var subscriptionCreateResponseContent = await subscriptionCreateResponse.Content.ReadAsStringAsync();
LogDebug($"Raw subscription create response: {subscriptionCreateResponseContent}");
LogDebug($"HTTP Status Code: {subscriptionCreateResponse.StatusCode}");
var subscriptionCreateResponseJson = JsonSerializer.Deserialize<SubscriptionCreateResponse>(subscriptionCreateResponseContent);
subscriptionCreateResponse, err := apiCreateSubscription()
if err != nil {
return fmt.Errorf("failed to create subscription: %v", err)
}
defer subscriptionCreateResponse.Body.Close()
subscriptionCreateResponseBody, err := io.ReadAll(subscriptionCreateResponse.Body)
if err != nil {
return fmt.Errorf("failed to read subscription create response: %v", err)
}
var subscriptionCreateResponseJSON map[string]interface{}
err = json.Unmarshal(subscriptionCreateResponseBody, &subscriptionCreateResponseJSON)
if err != nil {
return fmt.Errorf("failed to parse subscription create response: %v", err)
}
try (Response subscriptionCreateResponse = apiCreateSubscription()) {
String responseBody = subscriptionCreateResponse.body().string();
Map<String, Object> subscriptionCreateResponseJson = objectMapper.readValue(responseBody,
new TypeReference<Map<String, Object>>() {});
const subscriptionCreateResponse = await apiCreateSubscription();
const subscriptionCreateResponseJson = subscriptionCreateResponse.json();
Poll the subscription
subscription_id = subscription_create_response_json["id"]
subscription_status_response = api_get_subscription(subscription_id)
subscription_status_response_json = subscription_status_response.json()
log_json("Subscription %s status response" % subscription_id, subscription_status_response_json)
subscription_status = subscription_status_response_json["status"]
var subscriptionId = subscriptionCreateResponseJson.Id;
LogDebug($"Subscription ID: '{subscriptionId}'");
if (string.IsNullOrEmpty(subscriptionId))
{
LogError("Error: Subscription ID is empty or null");
return;
}
var subscriptionStatusResponse = await ApiGetSubscriptionAsync(subscriptionId, client);
var subscriptionStatusResponseContent = await subscriptionStatusResponse.Content.ReadAsStringAsync();
LogDebug($"Raw subscription status response: {subscriptionStatusResponseContent}");
var subscriptionStatusResponseJson = JsonSerializer.Deserialize<SubscriptionStatusResponse>(subscriptionStatusResponseContent);
LogJson($"Subscription {subscriptionId} status response", subscriptionStatusResponseJson);
var subscriptionStatus = subscriptionStatusResponseJson.Status;
subscriptionID := subscriptionCreateResponseJSON["id"].(string)
subscriptionStatusResponse, err := apiGetSubscription(subscriptionID)
if err != nil {
return fmt.Errorf("failed to get subscription status: %v", err)
}
defer subscriptionStatusResponse.Body.Close()
subscriptionStatusResponseBody, err := io.ReadAll(subscriptionStatusResponse.Body)
if err != nil {
return fmt.Errorf("failed to read subscription status response: %v", err)
}
var subscriptionStatusResponseJSON map[string]interface{}
err = json.Unmarshal(subscriptionStatusResponseBody, &subscriptionStatusResponseJSON)
if err != nil {
return fmt.Errorf("failed to parse subscription status response: %v", err)
}
logJSON(fmt.Sprintf("Subscription %s status response", subscriptionID), subscriptionStatusResponseJSON)
subscriptionStatus := subscriptionStatusResponseJSON["status"].(string)
String subscriptionId = (String) subscriptionCreateResponseJson.get("id");
Map<String, Object> subscriptionStatusResponseJson;
String subscriptionStatus;
try (Response subscriptionStatusResponse = apiGetSubscription(subscriptionId)) {
subscriptionStatusResponseJson = objectMapper.readValue(subscriptionStatusResponse.body().string(),
new TypeReference<Map<String, Object>>() {});
logJson("Subscription " + subscriptionId + " status response", subscriptionStatusResponseJson);
subscriptionStatus = (String) subscriptionStatusResponseJson.get("status");
}
const subscriptionId = subscriptionCreateResponseJson.id;
let subscriptionStatusResponse = await apiGetSubscription(subscriptionId);
let subscriptionStatusResponseJson = subscriptionStatusResponse.json();
logJson(`Subscription ${subscriptionId} status response`, subscriptionStatusResponseJson);
let subscriptionStatus = subscriptionStatusResponseJson.status;
Keep polling the subscription while REQUESTED
while subscription_status == "REQUESTED":
time.sleep(2)
subscription_status_response = api_get_subscription(subscription_id)
subscription_status_response_json = subscription_status_response.json()
subscription_status = subscription_status_response_json["status"]
log_json("Subscription %s status response" % subscription_id, subscription_status_response_json)
while (subscriptionStatus == "REQUESTED")
{
await Task.Delay(2000);
subscriptionStatusResponse = await ApiGetSubscriptionAsync(subscriptionId, client);
subscriptionStatusResponseContent = await subscriptionStatusResponse.Content.ReadAsStringAsync();
subscriptionStatusResponseJson = JsonSerializer.Deserialize<SubscriptionStatusResponse>(subscriptionStatusResponseContent);
subscriptionStatus = subscriptionStatusResponseJson.Status;
}
LogJson($"Subscription {subscriptionId} status response", subscriptionStatusResponseJson);
for subscriptionStatus == "REQUESTED" {
time.Sleep(2 * time.Second)
subscriptionStatusResponse, err = apiGetSubscription(subscriptionID)
if err != nil {
return fmt.Errorf("failed to get subscription status during polling: %v", err)
}
defer subscriptionStatusResponse.Body.Close()
subscriptionStatusResponseBody, err = io.ReadAll(subscriptionStatusResponse.Body)
if err != nil {
return fmt.Errorf("failed to read subscription status response during polling: %v", err)
}
err = json.Unmarshal(subscriptionStatusResponseBody, &subscriptionStatusResponseJSON)
if err != nil {
return fmt.Errorf("failed to parse subscription status response during polling: %v", err)
}
subscriptionStatus = subscriptionStatusResponseJSON["status"].(string)
}
logJSON(fmt.Sprintf("Subscription %s status response", subscriptionID), subscriptionStatusResponseJSON)
while ("REQUESTED".equals(subscriptionStatus)) {
Thread.sleep(2000);
try (Response subscriptionStatusResponse = apiGetSubscription(subscriptionId)) {
subscriptionStatusResponseJson = objectMapper.readValue(subscriptionStatusResponse.body().string(),
new TypeReference<Map<String, Object>>() {});
subscriptionStatus = (String) subscriptionStatusResponseJson.get("status");
}
}
logJson("Subscription " + subscriptionId + " status response", subscriptionStatusResponseJson);
while (subscriptionStatus === 'REQUESTED') {
await new Promise(resolve => setTimeout(resolve, 2000)); // Sleep 2 seconds
subscriptionStatusResponse = await apiGetSubscription(subscriptionId);
subscriptionStatusResponseJson = subscriptionStatusResponse.json();
subscriptionStatus = subscriptionStatusResponseJson.status;
}
logJson(`Subscription ${subscriptionId} status response`, subscriptionStatusResponseJson);
Use the endpoint information to connect
if subscription_status == "CREATED":
# NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
endpoint = subscription_status_response_json["endpoints"][0]
logging.info("Using endpoint %s" % endpoint)
amqp_connect_and_listen(endpoint)
if (subscriptionStatus == "CREATED")
{
// NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
var endpoint = subscriptionStatusResponseJson.Endpoints[0];
LogInfo($"Using endpoint {JsonSerializer.Serialize(endpoint)}");
await AmqpConnectAndListenAsync(endpoint);
}
if subscriptionStatus == "CREATED" {
endpoints := subscriptionStatusResponseJSON["endpoints"].([]interface{})
if len(endpoints) > 0 {
endpoint := endpoints[0].(map[string]interface{})
log.Printf("Using endpoint %v", endpoint)
amqpConnectAndListen(endpoint)
} else {
log.Printf("No endpoints available for subscription %s", subscriptionID)
}
} else {
if ("CREATED".equals(subscriptionStatus)) {
// NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
List<Map<String, Object>> endpoints = (List<Map<String, Object>>) subscriptionStatusResponseJson.get("endpoints");
Map<String, Object> endpoint = endpoints.get(0);
logger.info("Using endpoint " + endpoint);
amqpConnectAndListen(endpoint);
}
if (subscriptionStatus === 'CREATED') {
// NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
const endpoint = subscriptionStatusResponseJson.endpoints[0];
console.log(`${new Date().toISOString()} INFO Using endpoint ${JSON.stringify(endpoint)}`);
await amqpConnectAndListen(endpoint);
} else {
Full examples
Language | Location | Description |
---|---|---|
Python | examples/subscription/python | Complete workflow: CREATE → POLL → CONNECT → USE |
.NET | examples/subscription/dotnet | Complete workflow: CREATE → POLL → CONNECT → USE |
Go | examples/subscription/go | Complete workflow: CREATE → POLL → CONNECT → USE |
Java | examples/subscription/java | Complete workflow: CREATE → POLL → CONNECT → USE |
JavaScript | examples/subscription/javascript | Complete workflow: CREATE → POLL → CONNECT → USE |