Skip to content

Direct delivery connection with known endpoint

This example demonstrates how to connect directly to a known AMQP endpoint for publishing messages without using the Actor API to create a delivery. This is useful when you already have the endpoint information from a previous delivery creation or when working with persistent delivery endpoints.

Prerequisites

General Requirements

  • Valid client certificate and private key in PEM format
  • CA certificate in PEM format
  • Known AMQP endpoint information (host, port, target address)

Language-Specific Requirements

  • Python 3.x
  • python-qpid-proton library
  • .NET 6.0 or later
  • AMQPNetLite NuGet package
  • Go 1.18 or later
  • github.com/Azure/go-amqp package
  • Java 11 or later
  • Apache Qpid Proton-J library
  • Node.js 16 or later
  • rhea library

Environment Variables

Variable Description Example
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM Path to client certificate and private key in PEM format /path/to/client-cert-and-key.pem
CA_CERTIFICATE_PEM Path to CA certificate in PEM format /path/to/ca-cert.pem
MESSAGE_APPLICATION_PROPERTIES_JSON Message properties in JSON format {"messageType": "TEST", "publisherId": "XX99999", "publicationId": "XX99999:TEST", "originatingCountry": "XX", "protocolVersion": "TEST:0.0.0", "quadTree": ",1004,"}
ENDPOINT_HOST AMQP endpoint hostname amqp.example.com
ENDPOINT_PORT AMQP endpoint port 5671
ENDPOINT_TARGET AMQP target address for delivery delivery-target-address

Configuration

ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM=os.environ.get("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM=os.environ.get("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
MESSAGE_APPLICATION_PROPERTIES_JSON=os.environ.get("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json")

# Pre-known endpoint information
ENDPOINT_HOST=os.environ.get("ENDPOINT_HOST", "amqp_endpoint_host")
ENDPOINT_PORT=os.environ.get("ENDPOINT_PORT", "amqp_endpoint_port")
ENDPOINT_TARGET=os.environ.get("ENDPOINT_TARGET", "amqp_endpoint_target_address")
    private static readonly string ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = Environment.GetEnvironmentVariable("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM") ?? "pem_with_x509_certificate_chain_and_private_key";
    private static readonly string CA_CERTIFICATE_PEM = Environment.GetEnvironmentVariable("CA_CERTIFICATE_PEM") ?? "pem_with_x509_certificate";
    private static readonly string MESSAGE_APPLICATION_PROPERTIES_JSON = Environment.GetEnvironmentVariable("MESSAGE_APPLICATION_PROPERTIES_JSON") ?? "message_application_properties_json";

    // Pre-known endpoint information
    private static readonly string ENDPOINT_HOST = Environment.GetEnvironmentVariable("ENDPOINT_HOST") ?? "amqp_endpoint_host";
    private static readonly string ENDPOINT_PORT = Environment.GetEnvironmentVariable("ENDPOINT_PORT") ?? "amqp_endpoint_port";
    private static readonly string ENDPOINT_TARGET = Environment.GetEnvironmentVariable("ENDPOINT_TARGET") ?? "amqp_endpoint_target_address";
var (
    ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
    CA_CERTIFICATE_PEM                  = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
    MESSAGE_APPLICATION_PROPERTIES_JSON = getEnv("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json")

    // Pre-known endpoint information
    ENDPOINT_HOST   = getEnv("ENDPOINT_HOST", "amqp_endpoint_host")
    ENDPOINT_PORT   = getEnv("ENDPOINT_PORT", "amqp_endpoint_port")
    ENDPOINT_TARGET = getEnv("ENDPOINT_TARGET", "amqp_endpoint_target_address")
)

func getEnv(key, defaultValue string) string {
    if value := os.Getenv(key); value != "" {
        return value
    }
    return defaultValue
}
    private static final String ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key");
    private static final String CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate");
    private static final String MESSAGE_APPLICATION_PROPERTIES_JSON = getEnv("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json");

    // Pre-known endpoint information
    private static final String ENDPOINT_HOST = getEnv("ENDPOINT_HOST", "amqp_endpoint_host");
    private static final String ENDPOINT_PORT = getEnv("ENDPOINT_PORT", "amqp_endpoint_port");
    private static final String ENDPOINT_TARGET = getEnv("ENDPOINT_TARGET", "amqp_endpoint_target_address");

    private static String getEnv(String key, String defaultValue) {
        String value = System.getenv(key);
        return value != null ? value : defaultValue;
    }
const ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = process.env.ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM || 'pem_with_x509_certificate_chain_and_private_key';
const CA_CERTIFICATE_PEM = process.env.CA_CERTIFICATE_PEM || 'pem_with_x509_certificate';
const MESSAGE_APPLICATION_PROPERTIES_JSON = process.env.MESSAGE_APPLICATION_PROPERTIES_JSON || 'message_application_properties_json';

// Pre-known endpoint information
const ENDPOINT_HOST = process.env.ENDPOINT_HOST || 'amqp_endpoint_host';
const ENDPOINT_PORT = process.env.ENDPOINT_PORT || 'amqp_endpoint_port';
const ENDPOINT_TARGET = process.env.ENDPOINT_TARGET || 'amqp_endpoint_target_address';

// Message counter
let messageCount = 0;

Explanation

Configuration by environment variables for certificates and pre-known endpoint information

AMQP 1.0 Client

def amqp_create_ssl_config():
    ssl_config = SSLDomain(SSLDomain.MODE_CLIENT)
    ssl_config.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
    ssl_config.set_credentials(cert_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, key_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, password=None)
    ssl_config.set_trusted_ca_db(CA_CERTIFICATE_PEM)
    return ssl_config

class Sender(MessagingHandler):
    def __init__(self, endpoint):
        super(Sender, self).__init__()
        self.__endpoint = endpoint
        self.message_count = 0

    def on_start(self, event):
        logging.debug("Container reactor started")
        container = event.container
        endpoint = self.__endpoint

        # Step 1: connect
        ssl_config = amqp_create_ssl_config()
        amqp_url = "amqps://%s:%s" % (endpoint["host"], endpoint["port"])
        connection = container.connect(amqp_url, ssl_domain = ssl_config, reconnect = False, heartbeat = 5)

        # Step 2: create a sending link using the target address of the endpoint
        self.sender_link = container.create_sender(connection, endpoint["target"])

    def send_message(self):
        # Increment message counter
        self.message_count += 1
        # Create dynamic message content with counter and timestamp
        body_text = f"Hello World! Message #{self.message_count} at {datetime.now().strftime('%H:%M:%S')}"
        body_binary = body_text.encode('utf-8')
        message = Message()
        # Make sure to use the Data type for the body to ensure it is sent as binary
        message.inferred = True
        message.body = body_binary
        message.properties = json.loads(MESSAGE_APPLICATION_PROPERTIES_JSON)
        # Format properties in sorted order for consistent logging
        properties_dict = json.loads(MESSAGE_APPLICATION_PROPERTIES_JSON)
        sorted_properties = json.dumps(properties_dict, sort_keys=True)
        logging.info("Sending message: body='%s', properties=%s", body_text, sorted_properties)
        self.sender_link.send(message)

    def on_link_opened(self, event):
        # Send first message when link is opened (like Java onLinkRemoteOpen)
        if hasattr(event.link, 'credit') and event.link.credit > 0:
            self.send_message()

    def on_settled(self, event):
        # Schedule next message after 1 second
        event.container.schedule(1.0, self)

    def on_timer_task(self, event):
        # Send next message
        self.send_message()

def amqp_connect_and_publish(endpoint):
    sender = Sender(endpoint)
    container = Container(sender)
    thread = threading.Thread(name = "AMQPClient", target = container.run, daemon = True)
    thread.start()
    while thread.is_alive():
        time.sleep(1)
private static ConnectionFactory CreateConnectionFactory()
{
    var factory = new ConnectionFactory();

    try
    {
        // Configure SSL/TLS with client certificate for SASL EXTERNAL
        // Read the combined PEM file content
        var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
        var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
        factory.SSL.ClientCertificates.Add(clientCert);

        // Enable SSL/TLS
        factory.SSL.Protocols = System.Security.Authentication.SslProtocols.Tls13;

        // Load CA certificate for validation
        var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
        var caCert = X509Certificate2.CreateFromPem(caCertPem);
        factory.SSL.RemoteCertificateValidationCallback = (sender, cert, chain, errors) => {
            // Validate against CA certificate
            return ValidateCertificate(cert as X509Certificate2, caCert);
        };

        // Extract common name for SASL EXTERNAL
        var commonName = ExtractCommonName(clientCert.Subject);
        LogDebug($"Certificate Subject: {clientCert.Subject}");
        LogDebug($"Extracted Common Name: {commonName}");

        // Configure custom SASL EXTERNAL profile that sends only the CN value
        factory.SASL.Profile = new CustomSaslExternalProfile(commonName);
    }
    catch (Exception ex)
    {
        LogError($"Error loading certificates: {ex.Message}");
        throw;
    }

    return factory;
}

private static bool ValidateCertificate(X509Certificate2? serverCert, X509Certificate2 caCert)
{
    if (serverCert == null || caCert == null)
        return false;

    var chain = new X509Chain();
    chain.ChainPolicy.ExtraStore.Add(caCert);
    chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
    chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;

    return chain.Build(serverCert);
}

private static string ExtractCommonName(string subjectDn)
{
    // Extract CN value from Distinguished Name (e.g., "CN=XX99999, O=Company" -> "XX99999")
    var parts = subjectDn.Split(',');
    foreach (var part in parts)
    {
        var trimmed = part.Trim();
        if (trimmed.StartsWith("CN="))
        {
            return trimmed.Substring(3);
        }
    }
    return subjectDn; // Fallback to full DN if CN not found
}

// Custom SASL EXTERNAL profile that sends only the CN value
public class CustomSaslExternalProfile : SaslProfile
{
    private readonly string identity;

    public CustomSaslExternalProfile(string identity) : base(new Symbol("EXTERNAL"))
    {
        this.identity = identity;
    }

    protected override DescribedList GetStartCommand(string hostname)
    {
        return new SaslInit()
        {
            Mechanism = "EXTERNAL",
            InitialResponse = Encoding.UTF8.GetBytes(identity)
        };
    }

    protected override DescribedList OnCommand(DescribedList command)
    {
        // For EXTERNAL, we should only need the initial response
        return null!;
    }

    protected override ITransport UpgradeTransport(ITransport transport)
    {
        // No transport upgrade needed for EXTERNAL
        return transport;
    }
}

private static async Task AmqpConnectAndPublishAsync(DeliveryEndpoint endpoint)
{
    var factory = CreateConnectionFactory();

    // Extract CN from certificate for user identity
    var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
    var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
    var commonName = ExtractCommonName(clientCert.Subject);

    // Create connection with proper user identity
    var address = new Address($"amqps://{endpoint.Host}:{endpoint.Port}");
    var connection = await factory.CreateAsync(address);

    var session = new Session(connection);
    var sender = new SenderLink(session, "sender-link", endpoint.Target);

    var messageProperties = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(MESSAGE_APPLICATION_PROPERTIES_JSON);

    LogDebug("Container reactor started");

    int messageCount = 0;

    // Send messages continuously
    while (true)
    {
        // Increment message counter
        messageCount++;
        // Create dynamic message content with counter and timestamp
        var bodyText = $"Hello World! Message #{messageCount} at {DateTime.Now:HH:mm:ss}";
        var bodyBinary = Encoding.UTF8.GetBytes(bodyText);
        var message = new Message()
        {
            BodySection = new Data() { Binary = bodyBinary },
            ApplicationProperties = new ApplicationProperties()
        };

        foreach (var prop in messageProperties)
        {
            // Convert JsonElement to proper .NET types for AMQP
            object value = prop.Value.ValueKind switch
            {
                JsonValueKind.String => prop.Value.GetString(),
                JsonValueKind.Number => prop.Value.GetInt32(),
                JsonValueKind.True => true,
                JsonValueKind.False => false,
                JsonValueKind.Null => null,
                _ => prop.Value.ToString()
            };
            message.ApplicationProperties[prop.Key] = value;
        }

        // Format properties in sorted order for consistent logging
        var messagePropsDict = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(MESSAGE_APPLICATION_PROPERTIES_JSON);
        var sortedPropsJson = JsonSerializer.Serialize(messagePropsDict, new JsonSerializerOptions { WriteIndented = false });
        LogInfo($"Sending message: body='{bodyText}', properties={sortedPropsJson}");
        await sender.SendAsync(message);

        // Wait before sending the next message
        await Task.Delay(1000);
    }

    await sender.CloseAsync();
    await session.CloseAsync();
    await connection.CloseAsync();
}
func amqpCreateTLSConfig() (*tls.Config, error) {
    // Load client certificate and key
    cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
    if err != nil {
        return nil, fmt.Errorf("failed to load client certificate: %v", err)
    }

    // Load CA certificate
    caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
    if err != nil {
        return nil, fmt.Errorf("failed to read CA certificate: %v", err)
    }

    // Create certificate pool with CA
    caCertPool := x509.NewCertPool()
    caCertPool.AppendCertsFromPEM(caCert)

    tlsConfig := &tls.Config{
        Certificates:       []tls.Certificate{cert},
        RootCAs:            caCertPool,
        InsecureSkipVerify: false,
        MinVersion:         tls.VersionTLS13,
    }

    return tlsConfig, nil
}

type Sender struct {
    client       *amqp.Conn
    session      *amqp.Session
    sender       *amqp.Sender
    endpoint     map[string]string
    messageCount int
}

func (s *Sender) connect() error {
    // Step 1: Create TLS configuration
    tlsConfig, err := amqpCreateTLSConfig()
    if err != nil {
        return err
    }

    // Create AMQP URL
    amqpURL := fmt.Sprintf("amqps://%s:%s", s.endpoint["host"], s.endpoint["port"])
    log.Printf("Connecting to %s", amqpURL)

    // Step 2: Connect with TLS
    opts := &amqp.ConnOptions{
        TLSConfig:     tlsConfig,
        SASLType:      amqp.SASLTypeExternal(""),
        IdleTimeout:   0,
        MaxFrameSize:  65536,
        ContainerID:   "",
        HostName:      s.endpoint["host"],
        Properties:    nil,
    }

    ctx := context.Background()
    client, err := amqp.Dial(ctx, amqpURL, opts)
    if err != nil {
        return fmt.Errorf("failed to connect: %v", err)
    }
    s.client = client

    // Create session
    session, err := client.NewSession(ctx, nil)
    if err != nil {
        return fmt.Errorf("failed to create session: %v", err)
    }
    s.session = session

    // Step 3: Create sender link using the target address
    sender, err := session.NewSender(ctx, s.endpoint["target"], nil)
    if err != nil {
        return fmt.Errorf("failed to create sender: %v", err)
    }
    s.sender = sender

    log.Println("Container reactor started")
    return nil
}

func (s *Sender) sendMessage() error {
    // Parse message properties from JSON
    var properties map[string]interface{}
    err := json.Unmarshal([]byte(MESSAGE_APPLICATION_PROPERTIES_JSON), &properties)
    if err != nil {
        return fmt.Errorf("failed to parse message properties: %v", err)
    }

    // Increment message counter
    s.messageCount++
    // Create dynamic message content with counter and timestamp
    bodyText := fmt.Sprintf("Hello World! Message #%d at %s", s.messageCount, time.Now().Format("15:04:05"))

    // Create message
    msg := &amqp.Message{
        Data: [][]byte{[]byte(bodyText)},
        ApplicationProperties: properties,
    }

    ctx := context.Background()
    // Format message for logging with sorted properties
    // Create a map to ensure consistent ordering
    propsMap := make(map[string]interface{})
    for k, v := range msg.ApplicationProperties {
        propsMap[k] = v
    }

    // Marshal with sorted keys
    propsJSON, _ := json.Marshal(propsMap)
    log.Printf("Sending message: body='%s', properties=%s", bodyText, string(propsJSON))

    // Send message
    err = s.sender.Send(ctx, msg, nil)
    if err != nil {
        return fmt.Errorf("failed to send message: %v", err)
    }

    return nil
}

func (s *Sender) run() {
    for {
        err := s.sendMessage()
        if err != nil {
            log.Printf("Error sending message: %v", err)
        }
        time.Sleep(1 * time.Second)
    }
}

func (s *Sender) close() {
    if s.sender != nil {
        s.sender.Close(context.Background())
    }
    if s.session != nil {
        s.session.Close(context.Background())
    }
    if s.client != nil {
        s.client.Close()
    }
}

func amqpConnectAndPublish(endpoint map[string]string) error {
    sender := &Sender{
        endpoint: endpoint,
    }

    err := sender.connect()
    if err != nil {
        return err
    }
    defer sender.close()

    // Run sender in continuous loop
    sender.run()
    return nil
}
private static class SenderHandler extends BaseHandler {
    private final Map<String, String> endpoint;
    private final AtomicInteger messageCount = new AtomicInteger(0);
    private Sender sender;
    private final SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");
    private SSLContext sslContext;

    public SenderHandler(Map<String, String> endpoint) {
        this.endpoint = endpoint;
    }

    public void setSslContext(SSLContext sslContext) {
        this.sslContext = sslContext;
    }

    @Override
    public void onConnectionInit(Event event) {
        logger.fine("Connection initialized");
        Connection connection = event.getConnection();
        connection.setHostname(endpoint.get("host"));
        connection.setContainer("java-delivery-direct-example");
        connection.open();
    }

    @Override
    public void onConnectionBound(Event event) {
        logger.fine("Connection bound, configuring transport");
        Transport transport = event.getTransport();

        if (sslContext != null) {
            // Configure SASL EXTERNAL
            Sasl sasl = transport.sasl();
            sasl.setMechanisms("EXTERNAL");

            // Configure SSL
            SslDomain sslDomain = Proton.sslDomain();
            sslDomain.init(SslDomain.Mode.CLIENT);
            sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
            sslDomain.setSslContext(sslContext);

            transport.ssl(sslDomain);
        }
    }

    @Override
    public void onConnectionRemoteOpen(Event event) {
        logger.fine("Connection opened");
        Connection connection = event.getConnection();
        Session session = connection.session();
        session.open();
    }

    @Override
    public void onSessionRemoteOpen(Event event) {
        logger.fine("Session opened");
        Session session = event.getSession();
        Target target = new Target();
        String targetAddress = endpoint.get("target");
        target.setAddress(targetAddress);
        sender = session.sender(targetAddress);
        sender.setTarget(target);
        Source source = new Source();
        sender.setSource(source);
        sender.open();
    }

    @Override
    public void onLinkRemoteClose(Event event) {
        Link link = event.getLink();
        logger.severe("Link remote close - State: " + link.getRemoteState());
        if (link.getRemoteCondition() != null) {
            logger.severe("Condition: " + link.getRemoteCondition().getCondition());
            logger.severe("Description: " + link.getRemoteCondition().getDescription());
        }
    }

    @Override
    public void onLinkRemoteOpen(Event event) {
        logger.fine("Sender link opened, ready to send messages");
        if (event.getLink() instanceof Sender && event.getSender().getCredit() > 0) {
            sendMessage();
        }
    }

    @Override
    public void onDelivery(Event event) {
        Delivery delivery = event.getDelivery();
        if (delivery.getRemoteState() != null) {
            delivery.settle();
            // Schedule next message after 1 second
            event.getReactor().schedule(1000, this);
        }
    }

    @Override
    public void onTimerTask(Event event) {
        if (sender != null && sender.getCredit() > 0) {
            sendMessage();
        }
    }

    private void sendMessage() {
        try {
            // Increment message counter
            int count = messageCount.incrementAndGet();
            // Create dynamic message content with counter and timestamp
            String bodyText = String.format("Hello World! Message #%d at %s", count, timeFormat.format(new Date()));

            // Create message
            Message message = Message.Factory.create();
            message.setBody(new Data(new Binary(bodyText.getBytes(StandardCharsets.UTF_8))));

            // Parse and set application properties
            ObjectMapper mapper = new ObjectMapper();

            Map<String, Object> properties = mapper.readValue(MESSAGE_APPLICATION_PROPERTIES_JSON,
                    new TypeReference<Map<String, Object>>() {
                    });
            message.setApplicationProperties(new ApplicationProperties(properties));

            // Format properties for logging
            String sortedProperties = mapper.writeValueAsString(new TreeMap<>(properties));
            logger.info(String.format("Sending message: body='%s', properties=%s", bodyText, sortedProperties));

            // Send message
            byte[] encodedMessage = new byte[1024];
            int encodedSize = message.encode(encodedMessage, 0, encodedMessage.length);
            Delivery delivery = sender.delivery(new byte[0]);
            sender.send(encodedMessage, 0, encodedSize);
            sender.advance();
        } catch (Exception e) {
            logger.log(Level.WARNING, "Error sending message", e);
            e.printStackTrace();
        }
    }


    @Override
    public void onTransportError(Event event) {
        logger.log(Level.SEVERE, "Transport error: " + event.getTransport().getCondition());
    }
}

private static SSLContext createSSLContext() throws Exception {
    // Add BouncyCastle provider
    Security.addProvider(new BouncyCastleProvider());

    // Create SSL context
    SSLContext sslContext = SSLContext.getInstance("TLSv1.3");

    // Load client certificate and key
    KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(null);

    // Parse certificates and key using BouncyCastle
    List<java.security.cert.Certificate> certChain = new ArrayList<>();
    PrivateKey privateKey = null;

    try (PEMParser pemParser = new PEMParser(new FileReader(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM))) {
        Object object;
        JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
        JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter().setProvider("BC");

        while ((object = pemParser.readObject()) != null) {
            if (object instanceof X509CertificateHolder) {
                certChain.add(certConverter.getCertificate((X509CertificateHolder) object));
            } else if (object instanceof PEMKeyPair) {
                privateKey = keyConverter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo());
            }
        }
    }

    // Add certificate chain and key to keystore
    if (privateKey != null && !certChain.isEmpty()) {
        keyStore.setKeyEntry("client", privateKey, new char[0], certChain.toArray(new java.security.cert.Certificate[0]));
    } else {
        throw new IllegalStateException("Failed to load client certificate and key");
    }

    // Initialize key manager
    KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    kmf.init(keyStore, new char[0]);

    // Load CA certificate
    KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
    trustStore.load(null);

    CertificateFactory cf = CertificateFactory.getInstance("X.509");
    try (FileInputStream fis = new FileInputStream(CA_CERTIFICATE_PEM)) {
        java.security.cert.Certificate caCert = cf.generateCertificate(fis);
        trustStore.setCertificateEntry("ca", caCert);
    }

    // Initialize trust manager
    TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    tmf.init(trustStore);

    // Initialize SSL context
    sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());

    return sslContext;
}


private static void amqpConnectAndPublish(Map<String, String> endpoint) throws Exception {
    // Configure SSL
    SSLContext sslContext = createSSLContext();

    // Create handler
    SenderHandler handler = new SenderHandler(endpoint);
    handler.setSslContext(sslContext);

    // Create reactor
    Reactor reactor = Proton.reactor(handler);

    // Connect to host with SSL and SASL configuration
    String host = endpoint.get("host");
    int port = Integer.parseInt(endpoint.get("port"));

    // Use reactor's connection method with proper SSL/SASL setup
    reactor.connectionToHost(host, port, handler);

    // Run reactor
    reactor.run();
}
// No separate SSL config function needed - rhea expects these in connection options

function sendMessage(sender) {
    // Increment message counter
    messageCount++;
    // Create dynamic message content with counter and timestamp
    const now = new Date();
    const timeString = now.toTimeString().split(' ')[0]; // HH:MM:SS format
    const bodyText = `Hello World! Message #${messageCount} at ${timeString}`;
    const bodyBinary = Buffer.from(bodyText, 'utf8');

    // Parse application properties
    const properties = JSON.parse(MESSAGE_APPLICATION_PROPERTIES_JSON);

    // Create message with explicit data section for binary body
    const message = {
        // Use data_section to ensure body is sent as AMQP Data type (binary)
        body: rhea.message.data_section(bodyBinary),
        application_properties: properties
    };

    // Format properties in sorted order for consistent logging
    const sortedProperties = JSON.stringify(properties, Object.keys(properties).sort());
    console.log(`${new Date().toISOString()} INFO Sending message: body='${bodyText}', properties=${sortedProperties}`);

    sender.send(message);
}

function amqpConnectAndPublish(endpoint) {
    const container = rhea.create_container();

    // Connection options with TLS configuration
    const connectionOptions = {
        host: endpoint.host,
        port: parseInt(endpoint.port),
        transport: 'tls',
        // Client certificate authentication
        key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
        // CA certificate
        ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
        // Connection settings
        reconnect: false,
        initial_reconnect_delay: 0,
        max_reconnect_delay: 0,
        idle_time_out: 0,
        container_id: 'javascript-delivery-direct-example',
        // Enable SASL EXTERNAL for client certificate authentication
        enable_sasl_external: true
    };

    let sender = null;
    let sendTimer = null;

    container.on('connection_open', function (context) {
        console.log(`${new Date().toISOString()} DEBUG Connection opened`);
        console.log(`${new Date().toISOString()} DEBUG Creating sender for target: ${endpoint.target}`);
        // Create sender
        sender = context.connection.open_sender({
            target: {
                address: endpoint.target
            }
        });
    });

    container.on('sendable', function (context) {
        console.log(`${new Date().toISOString()} DEBUG Sender link opened, ready to send messages`);
        // Send first message immediately
        sendMessage(context.sender);

        // Schedule subsequent messages every 1 second
        sendTimer = setInterval(() => {
            if (context.sender.sendable()) {
                sendMessage(context.sender);
            }
        }, 1000);
    });

    container.on('accepted', function (context) {
        // Message was accepted by the broker
    });

    container.on('rejected', function (context) {
        console.log(`${new Date().toISOString()} WARNING Message rejected:`, context.delivery.remote.error);
    });

    container.on('connection_close', function (context) {
        console.log(`${new Date().toISOString()} INFO Connection closed`);
        if (context.connection && context.connection.error) {
            console.log(`${new Date().toISOString()} SEVERE Connection close error:`, context.connection.error);
        }
        if (sendTimer) {
            clearInterval(sendTimer);
        }
    });

    container.on('connection_error', function (context) {
        const error = context.connection.error || context.error;
        console.log(`${new Date().toISOString()} SEVERE Connection error:`, error);
    });

    container.on('error', function (context) {
        console.log(`${new Date().toISOString()} SEVERE Error:`, context.error);
    });

    container.on('disconnected', function (context) {
        console.log(`${new Date().toISOString()} SEVERE Disconnected:`, context.error || 'Unknown error');
        if (context.connection && context.connection.error) {
            console.log(`${new Date().toISOString()} SEVERE Disconnection details:`, context.connection.error);
        }
    });

    // Connect
    console.log(`${new Date().toISOString()} INFO Connecting to ${endpoint.host}:${endpoint.port}`);
    container.connect(connectionOptions);
}

Explanation

AMQP 1.0 client implementation with SSL/TLS configuration using:

  • Python: python-qpid-proton library
  • .NET: AMQPNetLite library
  • Go: Azure/go-amqp library
  • Java: Apache Qpid Proton-J library
  • JavaScript: rhea library

Direct connection and publishing

def direct_publish():
    try:
        # Create endpoint from environment variables
        endpoint = {
            "host": ENDPOINT_HOST,
            "port": ENDPOINT_PORT,
            "target": ENDPOINT_TARGET
        }

        logging.info("Using pre-known endpoint %s" % endpoint)
        amqp_connect_and_publish(endpoint)

    except Exception as e:
        logging.warning("An exception occurred while running direct_publish: %s" % e)
private static async Task DirectPublishAsync()
{
    try
    {
        // Create endpoint from environment variables
        var endpoint = new DeliveryEndpoint
        {
            Host = ENDPOINT_HOST,
            Port = int.Parse(ENDPOINT_PORT),
            Target = ENDPOINT_TARGET
        };

        LogInfo($"Using pre-known endpoint {JsonSerializer.Serialize(endpoint)}");
        await AmqpConnectAndPublishAsync(endpoint);
    }
    catch (Exception e)
    {
        LogError($"An exception occurred while running DirectPublish: {e.Message}");
    }
}
func directPublish() {
    // Create endpoint from environment variables
    endpoint := map[string]string{
        "host":   ENDPOINT_HOST,
        "port":   ENDPOINT_PORT,
        "target": ENDPOINT_TARGET,
    }

    log.Printf("Using pre-known endpoint %v", endpoint)
    err := amqpConnectAndPublish(endpoint)
    if err != nil {
        log.Printf("An exception occurred while running direct_publish: %v", err)
    }
}
private static void directPublish() {
    try {
        // Create endpoint from environment variables
        Map<String, String> endpoint = new HashMap<>();
        endpoint.put("host", ENDPOINT_HOST);
        endpoint.put("port", ENDPOINT_PORT);
        endpoint.put("target", ENDPOINT_TARGET);

        logger.info("Using pre-known endpoint " + endpoint);
        amqpConnectAndPublish(endpoint);

    } catch (Exception e) {
        logger.log(Level.SEVERE, "An exception occurred while running direct_publish", e);
    }
}
function directPublish() {
    try {
        // Create endpoint from environment variables
        const endpoint = {
            host: ENDPOINT_HOST,
            port: ENDPOINT_PORT,
            target: ENDPOINT_TARGET
        };

        console.log(`${new Date().toISOString()} INFO Using pre-known endpoint ${JSON.stringify(endpoint)}`);
        amqpConnectAndPublish(endpoint);

    } catch (error) {
        console.log(`${new Date().toISOString()} WARNING An exception occurred while running direct_publish: ${error.message}`);
    }
}

Explanation

Direct connection to AMQP endpoint using pre-configured host, port, and target address

Full examples

Language Location Description
Python examples/delivery-direct/python Direct AMQP connection with known endpoint
.NET examples/delivery-direct/dotnet Direct AMQP connection with known endpoint
Go examples/delivery-direct/go Direct AMQP connection with known endpoint
Java examples/delivery-direct/java Direct AMQP connection with known endpoint
JavaScript examples/delivery-direct/javascript Direct AMQP connection with known endpoint