Skip to content

Creating a subscription and receiving data

This example demonstrates the complete workflow for creating a subscription and receiving data through the Local Actor API v2. The workflow follows the pattern: CREATE → POLL → CONNECT → USE, where you first create a subscription via the REST API, poll for its status until it's ready, then connect to the provided AMQP endpoint to receive messages.

Prerequisites

General Requirements

  • Valid client certificate and private key in PEM format
  • CA certificate in PEM format
  • Access to a Local Actor API v2 instance

Language-Specific Requirements

  • Python 3.x
  • python-qpid-proton library
  • requests library
  • .NET 6.0 or later
  • AMQPNetLite NuGet package
  • Go 1.18 or later
  • github.com/Azure/go-amqp package
  • Java 11 or later
  • Maven 3.6 or later
  • Apache Qpid Proton-J library
  • OkHttp library
  • Jackson library
  • Bouncy Castle library
  • Node.js 16 or later
  • rhea library

Environment Variables

Variable Description Example
ACTOR_API_HOST Hostname of the Actor API instance api.example.com
ACTOR_API_PORT Port of the Actor API instance 443
ACTOR_API_SUBSCRIPTION_SELECTOR Selector for the subscription to create messageType = 'TEST'
ACTOR_COMMON_NAME Common name from the actor client certificate actor.example.com
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM Path to client certificate and private key in PEM format /path/to/client-cert-and-key.pem
CA_CERTIFICATE_PEM Path to CA certificate in PEM format /path/to/ca-cert.pem

Configuration

ACTOR_API_HOST=os.environ.get("ACTOR_API_HOST", "hostname_of_the_actor_api")
ACTOR_API_PORT=os.environ.get("ACTOR_API_PORT", "port_of_the_actor_api")
ACTOR_API_SUBSCRIPTION_SELECTOR=os.environ.get("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription")
ACTOR_COMMON_NAME=os.environ.get("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate")
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM=os.environ.get("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM=os.environ.get("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
    private static readonly string ACTOR_API_HOST = Environment.GetEnvironmentVariable("ACTOR_API_HOST") ?? "hostname_of_the_actor_api";
    private static readonly string ACTOR_API_PORT = Environment.GetEnvironmentVariable("ACTOR_API_PORT") ?? "port_of_the_actor_api";
    private static readonly string ACTOR_API_SUBSCRIPTION_SELECTOR = Environment.GetEnvironmentVariable("ACTOR_API_SUBSCRIPTION_SELECTOR") ?? "selector_of_the_subscription";
    private static readonly string ACTOR_COMMON_NAME = Environment.GetEnvironmentVariable("ACTOR_COMMON_NAME") ?? "cn_of_the_actor_client_certificate";
    private static readonly string ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = Environment.GetEnvironmentVariable("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM") ?? "pem_with_x509_certificate_chain_and_private_key";
    private static readonly string CA_CERTIFICATE_PEM = Environment.GetEnvironmentVariable("CA_CERTIFICATE_PEM") ?? "pem_with_x509_certificate";
var (
    ACTOR_API_HOST                   = getEnv("ACTOR_API_HOST", "hostname_of_the_actor_api")
    ACTOR_API_PORT                   = getEnv("ACTOR_API_PORT", "port_of_the_actor_api")
    ACTOR_API_SUBSCRIPTION_SELECTOR  = getEnv("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription")
    ACTOR_COMMON_NAME               = getEnv("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate")
    ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
    CA_CERTIFICATE_PEM              = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
)

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}
    private static final String ACTOR_API_HOST = getEnv("ACTOR_API_HOST", "hostname_of_the_actor_api");
    private static final String ACTOR_API_PORT = getEnv("ACTOR_API_PORT", "port_of_the_actor_api");
    private static final String ACTOR_API_SUBSCRIPTION_SELECTOR = getEnv("ACTOR_API_SUBSCRIPTION_SELECTOR", "selector_of_the_subscription");
    private static final String ACTOR_COMMON_NAME = getEnv("ACTOR_COMMON_NAME", "cn_of_the_actor_client_certificate");
    private static final String ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key");
    private static final String CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate");

    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static OkHttpClient httpClient;

    private static String getEnv(String key, String defaultValue) {
        String value = System.getenv(key);
        return value != null ? value : defaultValue;
    }
const ACTOR_API_HOST = process.env.ACTOR_API_HOST || 'hostname_of_the_actor_api';
const ACTOR_API_PORT = process.env.ACTOR_API_PORT || 'port_of_the_actor_api';
const ACTOR_API_SUBSCRIPTION_SELECTOR = process.env.ACTOR_API_SUBSCRIPTION_SELECTOR || 'selector_of_the_subscription';
const ACTOR_COMMON_NAME = process.env.ACTOR_COMMON_NAME || 'cn_of_the_actor_client_certificate';
const ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = process.env.ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM || 'pem_with_x509_certificate_chain_and_private_key';
const CA_CERTIFICATE_PEM = process.env.CA_CERTIFICATE_PEM || 'pem_with_x509_certificate';

Explanation

Configuration by environment variables

API Functions

def api_url(endpoint):
    return "https://%s:%s/%s/%s" % (ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint) 

def api_get(endpoint):
    return requests.get(api_url(endpoint), verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)

def api_post(endpoint, json_data):
    return requests.post(api_url(endpoint), None, json_data, verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)

def api_delete(endpoint):
    return requests.delete(api_url(endpoint), verify=CA_CERTIFICATE_PEM, cert=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)

def api_get_subscription(id):
    return api_get("subscriptions/%s" % id)

def api_delete_subscription(id):
    return api_delete("subscriptions/%s" % id)

def api_create_subscription():
    json_data = {
        "selector": ACTOR_API_SUBSCRIPTION_SELECTOR
    }
    return api_post("subscriptions", json_data) 
private static HttpClient CreateHttpClient()
{
    var handler = new HttpClientHandler();

    try
    {
        // Load client certificate using the same method as AMQP
        var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
        var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
        handler.ClientCertificates.Add(clientCert);

        // Load CA certificate for server validation
        var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
        var caCert = X509Certificate2.CreateFromPem(caCertPem);
        handler.ServerCertificateCustomValidationCallback = (sender, cert, chain, errors) => {
            return chain.ChainElements[chain.ChainElements.Count - 1].Certificate.Equals(caCert);
        };
    }
    catch (Exception ex)
    {
        LogError($"Error loading certificates for HTTP client: {ex.Message}");
        throw;
    }

    return new HttpClient(handler);
}

private static string ApiUrl(string endpoint)
{
    return $"https://{ACTOR_API_HOST}:{ACTOR_API_PORT}/{ACTOR_COMMON_NAME}/{endpoint}";
}

private static async Task<HttpResponseMessage> ApiGetAsync(string endpoint, HttpClient client)
{
    return await client.GetAsync(ApiUrl(endpoint));
}

private static async Task<HttpResponseMessage> ApiPostAsync(string endpoint, object jsonData, HttpClient client)
{
    var json = JsonSerializer.Serialize(jsonData);
    var content = new StringContent(json, Encoding.UTF8, "application/json");
    return await client.PostAsync(ApiUrl(endpoint), content);
}

private static async Task<HttpResponseMessage> ApiDeleteAsync(string endpoint, HttpClient client)
{
    return await client.DeleteAsync(ApiUrl(endpoint));
}

private static async Task<HttpResponseMessage> ApiGetSubscriptionAsync(string id, HttpClient client)
{
    return await ApiGetAsync($"subscriptions/{id}", client);
}

private static async Task<HttpResponseMessage> ApiDeleteSubscriptionAsync(string id, HttpClient client)
{
    return await ApiDeleteAsync($"subscriptions/{id}", client);
}

private static async Task<HttpResponseMessage> ApiCreateSubscriptionAsync(HttpClient client)
{
    var jsonData = new { selector = ACTOR_API_SUBSCRIPTION_SELECTOR };
    return await ApiPostAsync("subscriptions", jsonData, client);
}
func apiURL(endpoint string) string {
    return fmt.Sprintf("https://%s:%s/%s/%s", ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint)
}

func createHTTPClient() *http.Client {
    cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
    if err != nil {
        log.Fatalf("Failed to load client certificate: %v", err)
    }

    caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
    if err != nil {
        log.Fatalf("Failed to read CA certificate: %v", err)
    }

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
        MinVersion:   tls.VersionTLS13,
    }

    return &http.Client{
        Transport: &http.Transport{
            TLSClientConfig: tlsConfig,
        },
    }
}

func apiGet(endpoint string) (*http.Response, error) {
    client := createHTTPClient()
    return client.Get(apiURL(endpoint))
}

func apiPost(endpoint string, jsonData map[string]interface{}) (*http.Response, error) {
    client := createHTTPClient()

    data, err := json.Marshal(jsonData)
    if err != nil {
        return nil, err
    }

    req, err := http.NewRequest("POST", apiURL(endpoint), strings.NewReader(string(data)))
    if err != nil {
        return nil, err
    }

    req.Header.Set("Content-Type", "application/json")
    return client.Do(req)
}

func apiDelete(endpoint string) (*http.Response, error) {
    client := createHTTPClient()
    req, err := http.NewRequest("DELETE", apiURL(endpoint), nil)
    if err != nil {
        return nil, err
    }
    return client.Do(req)
}

func apiGetSubscription(id string) (*http.Response, error) {
    return apiGet(fmt.Sprintf("subscriptions/%s", id))
}

func apiDeleteSubscription(id string) (*http.Response, error) {
    return apiDelete(fmt.Sprintf("subscriptions/%s", id))
}

func apiCreateSubscription() (*http.Response, error) {
    jsonData := map[string]interface{}{
        "selector": ACTOR_API_SUBSCRIPTION_SELECTOR,
    }
    return apiPost("subscriptions", jsonData)
}
private static String apiUrl(String endpoint) {
    return String.format("https://%s:%s/%s/%s", ACTOR_API_HOST, ACTOR_API_PORT, ACTOR_COMMON_NAME, endpoint);
}

private static Response apiGet(String endpoint) throws IOException {
    Request request = new Request.Builder()
            .url(apiUrl(endpoint))
            .build();
    return httpClient.newCall(request).execute();
}

private static Response apiPost(String endpoint, String jsonData) throws IOException {
    RequestBody body = RequestBody.create(jsonData, MediaType.parse("application/json"));
    Request request = new Request.Builder()
            .url(apiUrl(endpoint))
            .post(body)
            .build();
    return httpClient.newCall(request).execute();
}

private static Response apiDelete(String endpoint) throws IOException {
    Request request = new Request.Builder()
            .url(apiUrl(endpoint))
            .delete()
            .build();
    return httpClient.newCall(request).execute();
}

private static Response apiGetSubscription(String id) throws IOException {
    return apiGet("subscriptions/" + id);
}

private static Response apiDeleteSubscription(String id) throws IOException {
    return apiDelete("subscriptions/" + id);
}

private static Response apiCreateSubscription() throws IOException {
    Map<String, Object> jsonData = new HashMap<>();
    jsonData.put("selector", ACTOR_API_SUBSCRIPTION_SELECTOR);
    String json = objectMapper.writeValueAsString(jsonData);
    return apiPost("subscriptions", json);
}
function apiUrl(endpoint) {
    return `https://${ACTOR_API_HOST}:${ACTOR_API_PORT}/${ACTOR_COMMON_NAME}/${endpoint}`;
}

function createHttpsAgent() {
    return new https.Agent({
        cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
        rejectUnauthorized: true
    });
}

function apiRequest(method, endpoint, data = null) {
    return new Promise((resolve, reject) => {
        const url = new URL(apiUrl(endpoint));
        const agent = createHttpsAgent();

        const options = {
            hostname: url.hostname,
            port: url.port,
            path: url.pathname,
            method: method,
            agent: agent,
            headers: {
                'Content-Type': 'application/json'
            }
        };

        const req = https.request(options, (res) => {
            let responseData = '';

            res.on('data', (chunk) => {
                responseData += chunk;
            });

            res.on('end', () => {
                const response = {
                    statusCode: res.statusCode,
                    ok: res.statusCode >= 200 && res.statusCode < 300,
                    data: responseData,
                    json: () => JSON.parse(responseData)
                };
                resolve(response);
            });
        });

        req.on('error', (error) => {
            reject(error);
        });

        if (data) {
            req.write(JSON.stringify(data));
        }

        req.end();
    });
}

function apiGet(endpoint) {
    return apiRequest('GET', endpoint);
}

function apiPost(endpoint, jsonData) {
    return apiRequest('POST', endpoint, jsonData);
}

function apiDelete(endpoint) {
    return apiRequest('DELETE', endpoint);
}

function apiGetSubscription(id) {
    return apiGet(`subscriptions/${id}`);
}

function apiDeleteSubscription(id) {
    return apiDelete(`subscriptions/${id}`);
}

function apiCreateSubscription() {
    const jsonData = {
        selector: ACTOR_API_SUBSCRIPTION_SELECTOR
    };
    return apiPost('subscriptions', jsonData);
}

Explanation

Implementation of the required subscription endpoints (see REST API Reference)

AMQP 1.0 Client

def amqp_create_ssl_config():
    ssl_config = SSLDomain(SSLDomain.MODE_CLIENT)
    ssl_config.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
    ssl_config.set_credentials(cert_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, key_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, password=None)
    ssl_config.set_trusted_ca_db(CA_CERTIFICATE_PEM)
    return ssl_config

class Receiver(MessagingHandler):
    def __init__(self, endpoint):
        super(Receiver, self).__init__()
        self.__endpoint = endpoint

    def on_start(self, event):
        logging.debug("Container reactor started")
        container = event.container
        endpoint = self.__endpoint

        # Step 1: connect
        ssl_config = amqp_create_ssl_config()
        amqp_url = "amqps://%s:%s" % (endpoint["host"], endpoint["port"])
        connection = container.connect(amqp_url, ssl_domain = ssl_config, reconnect = False, heartbeat = 5)

        # Step 2: create a receiving link using the source address of the endpoint
        container.create_receiver(connection, endpoint["source"])

    def on_message(self, event):
        # Decode binary body as UTF-8
        body_binary = event.message.body
        if isinstance(body_binary, bytes):
            body_text = body_binary.decode('utf-8')
        elif hasattr(body_binary, 'tobytes'):
            # Handle memory view objects
            body_text = body_binary.tobytes().decode('utf-8')
        else:
            body_text = str(body_binary)

        # Format application properties in sorted order
        app_props = {}
        if hasattr(event.message, 'properties') and event.message.properties:
            app_props = event.message.properties
        elif hasattr(event.message, 'application_properties') and event.message.application_properties:
            app_props = event.message.application_properties
        sorted_props_json = json.dumps(app_props, sort_keys=True)

        logging.info("Message received: body='%s', properties=%s", body_text, sorted_props_json)

def amqp_connect_and_listen(endpoint):
    receiver = Receiver(endpoint)
    container = Container(receiver)
    thread = threading.Thread(name = "AMQPClient", target = container.run, daemon = True)
    thread.start()
    while thread.is_alive():
        time.sleep(1)
private static ConnectionFactory CreateConnectionFactory()
{
    var factory = new ConnectionFactory();

    try
    {
        // Configure SSL/TLS with client certificate for SASL EXTERNAL
        // Read the combined PEM file content
        var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
        var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
        factory.SSL.ClientCertificates.Add(clientCert);

        // Enable SSL/TLS
        factory.SSL.Protocols = System.Security.Authentication.SslProtocols.Tls13;

        // Load CA certificate for validation
        var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
        var caCert = X509Certificate2.CreateFromPem(caCertPem);
        factory.SSL.RemoteCertificateValidationCallback = (sender, cert, chain, errors) => {
            // Validate against CA certificate
            return ValidateCertificate(cert as X509Certificate2, caCert);
        };

        // Extract common name for SASL EXTERNAL
        var commonName = ExtractCommonName(clientCert.Subject);
        LogDebug($"Certificate Subject: {clientCert.Subject}");
        LogDebug($"Extracted Common Name: {commonName}");

        // Configure custom SASL EXTERNAL profile that sends only the CN value
        factory.SASL.Profile = new CustomSaslExternalProfile(commonName);
    }
    catch (Exception ex)
    {
        LogError($"Error loading certificates: {ex.Message}");
        throw;
    }

    return factory;
}

private static bool ValidateCertificate(X509Certificate2? serverCert, X509Certificate2 caCert)
{
    if (serverCert == null || caCert == null)
        return false;

    var chain = new X509Chain();
    chain.ChainPolicy.ExtraStore.Add(caCert);
    chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
    chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;

    return chain.Build(serverCert);
}

private static string ExtractCommonName(string subjectDn)
{
    // Extract CN value from Distinguished Name (e.g., "CN=XX99999, O=Company" -> "XX99999")
    var parts = subjectDn.Split(',');
    foreach (var part in parts)
    {
        var trimmed = part.Trim();
        if (trimmed.StartsWith("CN="))
        {
            return trimmed.Substring(3);
        }
    }
    return subjectDn; // Fallback to full DN if CN not found
}

// Custom SASL EXTERNAL profile that sends only the CN value
public class CustomSaslExternalProfile : SaslProfile
{
    private readonly string identity;

    public CustomSaslExternalProfile(string identity) : base(new Symbol("EXTERNAL"))
    {
        this.identity = identity;
    }

    protected override DescribedList GetStartCommand(string hostname)
    {
        return new SaslInit()
        {
            Mechanism = "EXTERNAL",
            InitialResponse = Encoding.UTF8.GetBytes(identity)
        };
    }

    protected override DescribedList OnCommand(DescribedList command)
    {
        // For EXTERNAL, we should only need the initial response
        return null!;
    }

    protected override ITransport UpgradeTransport(ITransport transport)
    {
        // No transport upgrade needed for EXTERNAL
        return transport;
    }
}

private static void PrintMessageDetails(Message message)
{
    // Decode binary body as UTF-8
    var bodyText = "";
    if (message.Body is byte[] bodyBytes)
    {
        bodyText = Encoding.UTF8.GetString(bodyBytes);
    }
    else
    {
        bodyText = message.Body?.ToString() ?? "";
    }

    // Format application properties as JSON in sorted order
    var appPropsJson = "{}";
    if (message.ApplicationProperties != null && message.ApplicationProperties.Map != null)
    {
        var propsDict = new Dictionary<string, object>();
        foreach (var kvp in message.ApplicationProperties.Map)
        {
            propsDict[kvp.Key.ToString()] = kvp.Value;
        }
        // Sort keys for consistent output
        var sortedProps = propsDict.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value);
        appPropsJson = JsonSerializer.Serialize(sortedProps, new JsonSerializerOptions { WriteIndented = false });
    }

    LogInfo($"Message received: body='{bodyText}', properties={appPropsJson}");
}

private static async Task AmqpConnectAndListenAsync(SubscriptionEndpoint endpoint)
{
    var factory = CreateConnectionFactory();

    // Extract CN from certificate for user identity
    var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
    var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
    var commonName = ExtractCommonName(clientCert.Subject);

    // Create connection with proper user identity
    var address = new Address($"amqps://{endpoint.Host}:{endpoint.Port}");
    var connection = await factory.CreateAsync(address);

    var session = new Session(connection);
    var receiver = new ReceiverLink(session, "receiver-link", endpoint.Source);

    LogInfo("Listening for messages. Press Ctrl+C to stop.");

    var cts = new CancellationTokenSource();
    Console.CancelKeyPress += (sender, e) => {
        e.Cancel = true;
        cts.Cancel();
    };

    try
    {
        while (!cts.Token.IsCancellationRequested)
        {
            var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(1));
            if (message != null)
            {
                PrintMessageDetails(message);
                receiver.Accept(message);
            }
        }
    }
    catch (OperationCanceledException)
    {
        LogInfo("Stopping message listener...");
    }

    await receiver.CloseAsync();
    await session.CloseAsync();
    await connection.CloseAsync();
}
func amqpCreateTLSConfig() *tls.Config {
    cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
    if err != nil {
        log.Fatalf("Failed to load client certificate: %v", err)
    }

    caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
    if err != nil {
        log.Fatalf("Failed to read CA certificate: %v", err)
    }

    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    return &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      caCertPool,
        MinVersion:   tls.VersionTLS13,
    }
}

func amqpConnectAndListen(endpoint map[string]interface{}) {
    ctx := context.Background()

    host := endpoint["host"].(string)
    port := int(endpoint["port"].(float64))
    source := endpoint["source"].(string)

    amqpURL := fmt.Sprintf("amqps://%s:%d", host, port)

    conn, err := amqp.Dial(ctx, amqpURL, &amqp.ConnOptions{
        TLSConfig: amqpCreateTLSConfig(),
        SASLType:  amqp.SASLTypeExternal(""),
    })
    if err != nil {
        log.Fatalf("Failed to connect to AMQP server: %v", err)
    }
    defer conn.Close()

    session, err := conn.NewSession(ctx, nil)
    if err != nil {
        log.Fatalf("Failed to create AMQP session: %v", err)
    }
    defer session.Close(ctx)

    receiver, err := session.NewReceiver(ctx, source, nil)
    if err != nil {
        log.Fatalf("Failed to create AMQP receiver: %v", err)
    }
    defer receiver.Close(ctx)

    log.Printf("Starting to receive messages from %s. Press Ctrl+C to stop.", source)

    for {
        msg, err := receiver.Receive(ctx, nil)
        if err != nil {
            log.Printf("Error receiving message: %v", err)
            break
        }

        // Decode binary body as UTF-8
        var bodyText string
        if len(msg.Data) > 0 {
            bodyBytes := msg.Data[0]
            bodyText = string(bodyBytes)
        } else {
            bodyText = ""
        }

        // Format application properties in sorted order
        appPropsJSON := "{}"
        if msg.ApplicationProperties != nil {
            appPropsBytes, err := json.Marshal(msg.ApplicationProperties)
            if err == nil {
                appPropsJSON = string(appPropsBytes)
            }
        }

        log.Printf("Message received: body='%s', properties=%s", bodyText, appPropsJSON)

        err = receiver.AcceptMessage(ctx, msg)
        if err != nil {
            log.Printf("Error accepting message: %v", err)
        }
    }
}
private static class ReceiverHandler extends BaseHandler {
    private final Map<String, Object> endpoint;
    private SSLContext sslContext;

    public ReceiverHandler(Map<String, Object> endpoint) {
        this.endpoint = endpoint;
    }

    public void setSslContext(SSLContext sslContext) {
        this.sslContext = sslContext;
    }

    @Override
    public void onConnectionInit(Event event) {
        logger.fine("Connection initialized");
        Connection connection = event.getConnection();
        connection.setHostname((String) endpoint.get("host"));
        connection.setContainer("java-subscription-example");
        connection.open();
    }

    @Override
    public void onConnectionBound(Event event) {
        logger.fine("Connection bound, configuring transport");
        Transport transport = event.getTransport();

        if (sslContext != null) {
            // Configure SASL EXTERNAL
            Sasl sasl = transport.sasl();
            sasl.setMechanisms("EXTERNAL");

            // Configure SSL
            SslDomain sslDomain = Proton.sslDomain();
            sslDomain.init(SslDomain.Mode.CLIENT);
            sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            sslDomain.setSslContext(sslContext);

            transport.ssl(sslDomain);
        }
    }

    @Override
    public void onConnectionRemoteOpen(Event event) {
        logger.fine("Connection opened");
        Connection connection = event.getConnection();
        Session session = connection.session();
        session.open();
    }

    @Override
    public void onSessionRemoteOpen(Event event) {
        logger.fine("Session opened");
        Session session = event.getSession();
        String sourceAddress = (String) endpoint.get("source");
        Receiver receiver = session.receiver(sourceAddress);
        Source source = new Source();
        source.setAddress(sourceAddress);
        receiver.setSource(source);
        Target target = new Target();
        receiver.setTarget(target);
        receiver.open();
    }

    @Override
    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        logger.severe("Link remote close - State: " + link.getRemoteState());
        if (link.getRemoteCondition() != null) {
            logger.severe("Condition: " + link.getRemoteCondition().getCondition());
            logger.severe("Description: " + link.getRemoteCondition().getDescription());
        }
    }

    @Override
    public void onLinkRemoteOpen(Event event) {
        logger.fine("Receiver link opened, ready to receive messages");
        if (event.getLink() instanceof Receiver) {
            Receiver receiver = (Receiver) event.getLink();
            receiver.flow(10); // Initial credit
        }
    }

    @Override
    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        if (delivery.isReadable() && !delivery.isPartial()) {
            Receiver receiver = (Receiver) delivery.getLink();

            // Read the message
            int size = delivery.pending();
            byte[] buffer = new byte[size];
            int read = receiver.recv(buffer, 0, buffer.length);
            receiver.advance();

            // Decode the message
            Message message = Proton.message();
            message.decode(buffer, 0, read);

            // Extract body
            String bodyText = "";
            if (message.getBody() != null) {
                Object body = message.getBody();
                if (body instanceof org.apache.qpid.proton.amqp.messaging.Data) {
                    org.apache.qpid.proton.amqp.messaging.Data data = (org.apache.qpid.proton.amqp.messaging.Data) body;
                    if (data.getValue() != null) {
                        bodyText = new String(data.getValue().getArray(), StandardCharsets.UTF_8);
                    }
                } else {
                    bodyText = body.toString();
                }
            }

            // Extract and format application properties
            Map<String, Object> appProps = new HashMap<>();
            if (message.getApplicationProperties() != null && message.getApplicationProperties().getValue() != null) {
                appProps = message.getApplicationProperties().getValue();
            }

            try {
                // Format properties in sorted order
                String sortedPropsJson = objectMapper.writeValueAsString(new TreeMap<>(appProps));
                logger.info(String.format("Message received: body='%s', properties=%s", bodyText, sortedPropsJson));
            } catch (Exception e) {
                logger.warning("Error formatting properties: " + e.getMessage());
            }

            // Accept the message
            delivery.disposition(Accepted.getInstance());
            delivery.settle();

            // Flow more credit if needed
            if (receiver.getCredit() < 5) {
                receiver.flow(10);
            }
        }
    }

    @Override
    public void onTransportError(Event event) {
        logger.log(Level.SEVERE, "Transport error: " + event.getTransport().getCondition());
    }
}

private static SSLContext createSSLContext() throws Exception {
    // Add BouncyCastle provider
    Security.addProvider(new BouncyCastleProvider());

    // Create SSL context
    SSLContext sslContext = SSLContext.getInstance("TLSv1.3");

    // Load client certificate and key
    KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(null);

    // Parse certificates and key using BouncyCastle
    List<java.security.cert.Certificate> certChain = new ArrayList<>();
    PrivateKey privateKey = null;

    try (PEMParser pemParser = new PEMParser(new FileReader(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM))) {
        Object object;
        JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
        JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter().setProvider("BC");

        while ((object = pemParser.readObject()) != null) {
            if (object instanceof X509CertificateHolder) {
                certChain.add(certConverter.getCertificate((X509CertificateHolder) object));
            } else if (object instanceof PEMKeyPair) {
                privateKey = keyConverter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo());
            }
        }
    }

    // Add certificate chain and key to keystore
    if (privateKey != null && !certChain.isEmpty()) {
        keyStore.setKeyEntry("client", privateKey, new char[0], certChain.toArray(new java.security.cert.Certificate[0]));
    } else {
        throw new IllegalStateException("Failed to load client certificate and key");
    }

    // Initialize key manager
    KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    kmf.init(keyStore, new char[0]);

    // Load CA certificate
    KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
    trustStore.load(null);

    CertificateFactory cf = CertificateFactory.getInstance("X.509");
    try (FileInputStream fis = new FileInputStream(CA_CERTIFICATE_PEM)) {
        java.security.cert.Certificate caCert = cf.generateCertificate(fis);
        trustStore.setCertificateEntry("ca", caCert);
    }

    // Initialize trust manager
    TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    tmf.init(trustStore);

    // Initialize SSL context
    sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());

    return sslContext;
}

private static void amqpConnectAndListen(Map<String, Object> endpoint) throws Exception {
    // Configure SSL
    SSLContext sslContext = createSSLContext();

    // Create handler
    ReceiverHandler handler = new ReceiverHandler(endpoint);
    handler.setSslContext(sslContext);

    // Create reactor
    Reactor reactor = Proton.reactor(handler);

    // Connect to host with SSL and SASL configuration
    String host = (String) endpoint.get("host");
    int port = ((Number) endpoint.get("port")).intValue();

    // Use reactor's connection method with proper SSL/SASL setup
    reactor.connectionToHost(host, port, handler);

    // Run reactor
    reactor.run();
}
function amqpConnectAndListen(endpoint) {
    const container = rhea.create_container();

    // Connection options with TLS configuration
    const connectionOptions = {
        host: endpoint.host,
        port: parseInt(endpoint.port),
        transport: 'tls',
        // Client certificate authentication
        key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        // CA certificate
        ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
        // Connection settings
        reconnect: false,
        initial_reconnect_delay: 0,
        max_reconnect_delay: 0,
        idle_time_out: 5000, // 5 second heartbeat
        container_id: 'javascript-subscription-example',
        // Enable SASL EXTERNAL for client certificate authentication
        enable_sasl_external: true
    };

    let receiver = null;

    container.on('connection_open', function (context) {
        console.log(`${new Date().toISOString()} DEBUG Container reactor started`);
        console.log(`${new Date().toISOString()} DEBUG Creating receiver for source: ${endpoint.source}`);
        // Create receiver
        receiver = context.connection.open_receiver({
            source: {
                address: endpoint.source
            }
        });
    });

    container.on('receiver_open', function (context) {
        console.log(`${new Date().toISOString()} DEBUG Receiver link opened, ready to receive messages`);
    });

    container.on('message', function (context) {
        // Decode binary body as UTF-8
        let bodyText = '';
        if (context.message.body !== undefined) {
            if (Buffer.isBuffer(context.message.body)) {
                // Direct buffer case
                bodyText = context.message.body.toString('utf8');
            } else if (Array.isArray(context.message.body)) {
                // Array of data sections - concatenate all buffers
                for (const section of context.message.body) {
                    if (Buffer.isBuffer(section)) {
                        bodyText += section.toString('utf8');
                    }
                }
            } else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Section') {
                // AMQP Section object with content buffer
                if (Buffer.isBuffer(context.message.body.content)) {
                    bodyText = context.message.body.content.toString('utf8');
                } else {
                    bodyText = String(context.message.body.content);
                }
            } else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Buffer') {
                // Sometimes Buffer objects appear as generic objects
                bodyText = Buffer.from(context.message.body).toString('utf8');
            } else {
                // Fallback to string conversion
                bodyText = String(context.message.body);
            }
        }

        // Format application properties in sorted order
        let appProps = {};
        if (context.message.application_properties) {
            appProps = context.message.application_properties;
        }
        const sortedPropsJson = JSON.stringify(appProps, Object.keys(appProps).sort());

        console.log(`${new Date().toISOString()} INFO Message received: body='${bodyText}', properties=${sortedPropsJson}`);

        // Accept the message
        context.delivery.accept();
    });

    container.on('connection_close', function (context) {
        console.log(`${new Date().toISOString()} INFO Connection closed`);
        if (context.connection && context.connection.error) {
            console.log(`${new Date().toISOString()} SEVERE Connection close error:`, context.connection.error);
        }
    });

    container.on('connection_error', function (context) {
        const error = context.connection.error || context.error;
        console.log(`${new Date().toISOString()} SEVERE Connection error:`, error);
    });

    container.on('error', function (context) {
        console.log(`${new Date().toISOString()} SEVERE Error:`, context.error);
    });

    container.on('disconnected', function (context) {
        console.log(`${new Date().toISOString()} SEVERE Disconnected:`, context.error || 'Unknown error');
        if (context.connection && context.connection.error) {
            console.log(`${new Date().toISOString()} SEVERE Disconnection details:`, context.connection.error);
        }
    });

    // Connect
    console.log(`${new Date().toISOString()} INFO Connecting to amqps://${endpoint.host}:${endpoint.port}`);
    container.connect(connectionOptions);

    // Return a promise that runs indefinitely (like Python while thread.is_alive())
    return new Promise(() => {
        // This promise never resolves, keeping the connection alive
    });
}

Create a subscription

subscription_create_response = api_create_subscription()
subscription_create_response_json = subscription_create_response.json();
var subscriptionCreateResponse = await ApiCreateSubscriptionAsync(client);
var subscriptionCreateResponseContent = await subscriptionCreateResponse.Content.ReadAsStringAsync();
LogDebug($"Raw subscription create response: {subscriptionCreateResponseContent}");
LogDebug($"HTTP Status Code: {subscriptionCreateResponse.StatusCode}");

var subscriptionCreateResponseJson = JsonSerializer.Deserialize<SubscriptionCreateResponse>(subscriptionCreateResponseContent);
subscriptionCreateResponse, err := apiCreateSubscription()
if err != nil {
    return fmt.Errorf("failed to create subscription: %v", err)
}
defer subscriptionCreateResponse.Body.Close()

subscriptionCreateResponseBody, err := io.ReadAll(subscriptionCreateResponse.Body)
if err != nil {
    return fmt.Errorf("failed to read subscription create response: %v", err)
}

var subscriptionCreateResponseJSON map[string]interface{}
err = json.Unmarshal(subscriptionCreateResponseBody, &subscriptionCreateResponseJSON)
if err != nil {
    return fmt.Errorf("failed to parse subscription create response: %v", err)
}
try (Response subscriptionCreateResponse = apiCreateSubscription()) {
    String responseBody = subscriptionCreateResponse.body().string();
    Map<String, Object> subscriptionCreateResponseJson = objectMapper.readValue(responseBody, 
        new TypeReference<Map<String, Object>>() {});
const subscriptionCreateResponse = await apiCreateSubscription();
const subscriptionCreateResponseJson = subscriptionCreateResponse.json();

Poll the subscription

subscription_id = subscription_create_response_json["id"]
subscription_status_response = api_get_subscription(subscription_id)
subscription_status_response_json = subscription_status_response.json()
log_json("Subscription %s status response" % subscription_id, subscription_status_response_json)
subscription_status = subscription_status_response_json["status"]
var subscriptionId = subscriptionCreateResponseJson.Id;
LogDebug($"Subscription ID: '{subscriptionId}'");

if (string.IsNullOrEmpty(subscriptionId))
{
    LogError("Error: Subscription ID is empty or null");
    return;
}

var subscriptionStatusResponse = await ApiGetSubscriptionAsync(subscriptionId, client);
var subscriptionStatusResponseContent = await subscriptionStatusResponse.Content.ReadAsStringAsync();
LogDebug($"Raw subscription status response: {subscriptionStatusResponseContent}");

var subscriptionStatusResponseJson = JsonSerializer.Deserialize<SubscriptionStatusResponse>(subscriptionStatusResponseContent);
LogJson($"Subscription {subscriptionId} status response", subscriptionStatusResponseJson);
var subscriptionStatus = subscriptionStatusResponseJson.Status;
subscriptionID := subscriptionCreateResponseJSON["id"].(string)
subscriptionStatusResponse, err := apiGetSubscription(subscriptionID)
if err != nil {
    return fmt.Errorf("failed to get subscription status: %v", err)
}
defer subscriptionStatusResponse.Body.Close()

subscriptionStatusResponseBody, err := io.ReadAll(subscriptionStatusResponse.Body)
if err != nil {
    return fmt.Errorf("failed to read subscription status response: %v", err)
}

var subscriptionStatusResponseJSON map[string]interface{}
err = json.Unmarshal(subscriptionStatusResponseBody, &subscriptionStatusResponseJSON)
if err != nil {
    return fmt.Errorf("failed to parse subscription status response: %v", err)
}

logJSON(fmt.Sprintf("Subscription %s status response", subscriptionID), subscriptionStatusResponseJSON)
subscriptionStatus := subscriptionStatusResponseJSON["status"].(string)
String subscriptionId = (String) subscriptionCreateResponseJson.get("id");
Map<String, Object> subscriptionStatusResponseJson;
String subscriptionStatus;

try (Response subscriptionStatusResponse = apiGetSubscription(subscriptionId)) {
    subscriptionStatusResponseJson = objectMapper.readValue(subscriptionStatusResponse.body().string(),
        new TypeReference<Map<String, Object>>() {});
    logJson("Subscription " + subscriptionId + " status response", subscriptionStatusResponseJson);
    subscriptionStatus = (String) subscriptionStatusResponseJson.get("status");
}
const subscriptionId = subscriptionCreateResponseJson.id;
let subscriptionStatusResponse = await apiGetSubscription(subscriptionId);
let subscriptionStatusResponseJson = subscriptionStatusResponse.json();
logJson(`Subscription ${subscriptionId} status response`, subscriptionStatusResponseJson);
let subscriptionStatus = subscriptionStatusResponseJson.status;

Keep polling the subscription while REQUESTED

while subscription_status == "REQUESTED":
    time.sleep(2)
    subscription_status_response = api_get_subscription(subscription_id)
    subscription_status_response_json = subscription_status_response.json()
    subscription_status = subscription_status_response_json["status"]

log_json("Subscription %s status response" % subscription_id, subscription_status_response_json)
while (subscriptionStatus == "REQUESTED")
{
    await Task.Delay(2000);
    subscriptionStatusResponse = await ApiGetSubscriptionAsync(subscriptionId, client);
    subscriptionStatusResponseContent = await subscriptionStatusResponse.Content.ReadAsStringAsync();
    subscriptionStatusResponseJson = JsonSerializer.Deserialize<SubscriptionStatusResponse>(subscriptionStatusResponseContent);
    subscriptionStatus = subscriptionStatusResponseJson.Status;
}

LogJson($"Subscription {subscriptionId} status response", subscriptionStatusResponseJson);
for subscriptionStatus == "REQUESTED" {
    time.Sleep(2 * time.Second)
    subscriptionStatusResponse, err = apiGetSubscription(subscriptionID)
    if err != nil {
        return fmt.Errorf("failed to get subscription status during polling: %v", err)
    }
    defer subscriptionStatusResponse.Body.Close()

    subscriptionStatusResponseBody, err = io.ReadAll(subscriptionStatusResponse.Body)
    if err != nil {
        return fmt.Errorf("failed to read subscription status response during polling: %v", err)
    }

    err = json.Unmarshal(subscriptionStatusResponseBody, &subscriptionStatusResponseJSON)
    if err != nil {
        return fmt.Errorf("failed to parse subscription status response during polling: %v", err)
    }

    subscriptionStatus = subscriptionStatusResponseJSON["status"].(string)
}

logJSON(fmt.Sprintf("Subscription %s status response", subscriptionID), subscriptionStatusResponseJSON)
while ("REQUESTED".equals(subscriptionStatus)) {
    Thread.sleep(2000);
    try (Response subscriptionStatusResponse = apiGetSubscription(subscriptionId)) {
        subscriptionStatusResponseJson = objectMapper.readValue(subscriptionStatusResponse.body().string(),
            new TypeReference<Map<String, Object>>() {});
        subscriptionStatus = (String) subscriptionStatusResponseJson.get("status");
    }
}

logJson("Subscription " + subscriptionId + " status response", subscriptionStatusResponseJson);
while (subscriptionStatus === 'REQUESTED') {
    await new Promise(resolve => setTimeout(resolve, 2000)); // Sleep 2 seconds
    subscriptionStatusResponse = await apiGetSubscription(subscriptionId);
    subscriptionStatusResponseJson = subscriptionStatusResponse.json();
    subscriptionStatus = subscriptionStatusResponseJson.status;
}

logJson(`Subscription ${subscriptionId} status response`, subscriptionStatusResponseJson);

Use the endpoint information to connect

if subscription_status == "CREATED":
    # NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
    endpoint = subscription_status_response_json["endpoints"][0]
    logging.info("Using endpoint %s" % endpoint)
    amqp_connect_and_listen(endpoint)
if (subscriptionStatus == "CREATED")
{
    // NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
    var endpoint = subscriptionStatusResponseJson.Endpoints[0];
    LogInfo($"Using endpoint {JsonSerializer.Serialize(endpoint)}");
    await AmqpConnectAndListenAsync(endpoint);
}
if subscriptionStatus == "CREATED" {
    endpoints := subscriptionStatusResponseJSON["endpoints"].([]interface{})
    if len(endpoints) > 0 {
        endpoint := endpoints[0].(map[string]interface{})
        log.Printf("Using endpoint %v", endpoint)
        amqpConnectAndListen(endpoint)
    } else {
        log.Printf("No endpoints available for subscription %s", subscriptionID)
    }
} else {
if ("CREATED".equals(subscriptionStatus)) {
    // NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
    List<Map<String, Object>> endpoints = (List<Map<String, Object>>) subscriptionStatusResponseJson.get("endpoints");
    Map<String, Object> endpoint = endpoints.get(0);
    logger.info("Using endpoint " + endpoint);
    amqpConnectAndListen(endpoint);
}
if (subscriptionStatus === 'CREATED') {
    // NOTE to keep things simple, this code assumes that this response contains exactly one endpoint!
    const endpoint = subscriptionStatusResponseJson.endpoints[0];
    console.log(`${new Date().toISOString()} INFO Using endpoint ${JSON.stringify(endpoint)}`);
    await amqpConnectAndListen(endpoint);
} else {

Full examples

Language Location Description
Python examples/subscription/python Complete workflow: CREATE → POLL → CONNECT → USE
.NET examples/subscription/dotnet Complete workflow: CREATE → POLL → CONNECT → USE
Go examples/subscription/go Complete workflow: CREATE → POLL → CONNECT → USE
Java examples/subscription/java Complete workflow: CREATE → POLL → CONNECT → USE
JavaScript examples/subscription/javascript Complete workflow: CREATE → POLL → CONNECT → USE