Direct delivery connection with known endpoint
This example demonstrates how to connect directly to a known AMQP endpoint for publishing messages without using the Actor API to create a delivery. This is useful when you already have the endpoint information from a previous delivery creation or when working with persistent delivery endpoints.
Prerequisites
General Requirements
- Valid client certificate and private key in PEM format
- CA certificate in PEM format
- Known AMQP endpoint information (host, port, target address)
Language-Specific Requirements
- Python 3.x
- python-qpid-proton library
- .NET 6.0 or later
- AMQPNetLite NuGet package
- Go 1.18 or later
- github.com/Azure/go-amqp package
- Java 11 or later
- Apache Qpid Proton-J library
- Node.js 16 or later
- rhea library
Environment Variables
Variable | Description | Example |
---|---|---|
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM |
Path to client certificate and private key in PEM format | /path/to/client-cert-and-key.pem |
CA_CERTIFICATE_PEM |
Path to CA certificate in PEM format | /path/to/ca-cert.pem |
MESSAGE_APPLICATION_PROPERTIES_JSON |
Message properties in JSON format | {"messageType": "TEST", "publisherId": "XX99999", "publicationId": "XX99999:TEST", "originatingCountry": "XX", "protocolVersion": "TEST:0.0.0", "quadTree": ",1004,"} |
ENDPOINT_HOST |
AMQP endpoint hostname | amqp.example.com |
ENDPOINT_PORT |
AMQP endpoint port | 5671 |
ENDPOINT_TARGET |
AMQP target address for delivery | delivery-target-address |
Configuration
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM=os.environ.get("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM=os.environ.get("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
MESSAGE_APPLICATION_PROPERTIES_JSON=os.environ.get("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json")
# Pre-known endpoint information
ENDPOINT_HOST=os.environ.get("ENDPOINT_HOST", "amqp_endpoint_host")
ENDPOINT_PORT=os.environ.get("ENDPOINT_PORT", "amqp_endpoint_port")
ENDPOINT_TARGET=os.environ.get("ENDPOINT_TARGET", "amqp_endpoint_target_address")
private static readonly string ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = Environment.GetEnvironmentVariable("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM") ?? "pem_with_x509_certificate_chain_and_private_key";
private static readonly string CA_CERTIFICATE_PEM = Environment.GetEnvironmentVariable("CA_CERTIFICATE_PEM") ?? "pem_with_x509_certificate";
private static readonly string MESSAGE_APPLICATION_PROPERTIES_JSON = Environment.GetEnvironmentVariable("MESSAGE_APPLICATION_PROPERTIES_JSON") ?? "message_application_properties_json";
// Pre-known endpoint information
private static readonly string ENDPOINT_HOST = Environment.GetEnvironmentVariable("ENDPOINT_HOST") ?? "amqp_endpoint_host";
private static readonly string ENDPOINT_PORT = Environment.GetEnvironmentVariable("ENDPOINT_PORT") ?? "amqp_endpoint_port";
private static readonly string ENDPOINT_TARGET = Environment.GetEnvironmentVariable("ENDPOINT_TARGET") ?? "amqp_endpoint_target_address";
var (
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
MESSAGE_APPLICATION_PROPERTIES_JSON = getEnv("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json")
// Pre-known endpoint information
ENDPOINT_HOST = getEnv("ENDPOINT_HOST", "amqp_endpoint_host")
ENDPOINT_PORT = getEnv("ENDPOINT_PORT", "amqp_endpoint_port")
ENDPOINT_TARGET = getEnv("ENDPOINT_TARGET", "amqp_endpoint_target_address")
)
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
private static final String ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key");
private static final String CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate");
private static final String MESSAGE_APPLICATION_PROPERTIES_JSON = getEnv("MESSAGE_APPLICATION_PROPERTIES_JSON", "message_application_properties_json");
// Pre-known endpoint information
private static final String ENDPOINT_HOST = getEnv("ENDPOINT_HOST", "amqp_endpoint_host");
private static final String ENDPOINT_PORT = getEnv("ENDPOINT_PORT", "amqp_endpoint_port");
private static final String ENDPOINT_TARGET = getEnv("ENDPOINT_TARGET", "amqp_endpoint_target_address");
private static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
return value != null ? value : defaultValue;
}
const ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = process.env.ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM || 'pem_with_x509_certificate_chain_and_private_key';
const CA_CERTIFICATE_PEM = process.env.CA_CERTIFICATE_PEM || 'pem_with_x509_certificate';
const MESSAGE_APPLICATION_PROPERTIES_JSON = process.env.MESSAGE_APPLICATION_PROPERTIES_JSON || 'message_application_properties_json';
// Pre-known endpoint information
const ENDPOINT_HOST = process.env.ENDPOINT_HOST || 'amqp_endpoint_host';
const ENDPOINT_PORT = process.env.ENDPOINT_PORT || 'amqp_endpoint_port';
const ENDPOINT_TARGET = process.env.ENDPOINT_TARGET || 'amqp_endpoint_target_address';
// Message counter
let messageCount = 0;
Explanation
Configuration by environment variables for certificates and pre-known endpoint information
AMQP 1.0 Client
def amqp_create_ssl_config():
ssl_config = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_config.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
ssl_config.set_credentials(cert_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, key_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, password=None)
ssl_config.set_trusted_ca_db(CA_CERTIFICATE_PEM)
return ssl_config
class Sender(MessagingHandler):
def __init__(self, endpoint):
super(Sender, self).__init__()
self.__endpoint = endpoint
self.message_count = 0
def on_start(self, event):
logging.debug("Container reactor started")
container = event.container
endpoint = self.__endpoint
# Step 1: connect
ssl_config = amqp_create_ssl_config()
amqp_url = "amqps://%s:%s" % (endpoint["host"], endpoint["port"])
connection = container.connect(amqp_url, ssl_domain = ssl_config, reconnect = False, heartbeat = 5)
# Step 2: create a sending link using the target address of the endpoint
self.sender_link = container.create_sender(connection, endpoint["target"])
def send_message(self):
# Increment message counter
self.message_count += 1
# Create dynamic message content with counter and timestamp
body_text = f"Hello World! Message #{self.message_count} at {datetime.now().strftime('%H:%M:%S')}"
body_binary = body_text.encode('utf-8')
message = Message()
# Make sure to use the Data type for the body to ensure it is sent as binary
message.inferred = True
message.body = body_binary
message.properties = json.loads(MESSAGE_APPLICATION_PROPERTIES_JSON)
# Format properties in sorted order for consistent logging
properties_dict = json.loads(MESSAGE_APPLICATION_PROPERTIES_JSON)
sorted_properties = json.dumps(properties_dict, sort_keys=True)
logging.info("Sending message: body='%s', properties=%s", body_text, sorted_properties)
self.sender_link.send(message)
def on_link_opened(self, event):
# Send first message when link is opened (like Java onLinkRemoteOpen)
if hasattr(event.link, 'credit') and event.link.credit > 0:
self.send_message()
def on_settled(self, event):
# Schedule next message after 1 second
event.container.schedule(1.0, self)
def on_timer_task(self, event):
# Send next message
self.send_message()
def amqp_connect_and_publish(endpoint):
sender = Sender(endpoint)
container = Container(sender)
thread = threading.Thread(name = "AMQPClient", target = container.run, daemon = True)
thread.start()
while thread.is_alive():
time.sleep(1)
private static ConnectionFactory CreateConnectionFactory()
{
var factory = new ConnectionFactory();
try
{
// Configure SSL/TLS with client certificate for SASL EXTERNAL
// Read the combined PEM file content
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
factory.SSL.ClientCertificates.Add(clientCert);
// Enable SSL/TLS
factory.SSL.Protocols = System.Security.Authentication.SslProtocols.Tls13;
// Load CA certificate for validation
var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
var caCert = X509Certificate2.CreateFromPem(caCertPem);
factory.SSL.RemoteCertificateValidationCallback = (sender, cert, chain, errors) => {
// Validate against CA certificate
return ValidateCertificate(cert as X509Certificate2, caCert);
};
// Extract common name for SASL EXTERNAL
var commonName = ExtractCommonName(clientCert.Subject);
LogDebug($"Certificate Subject: {clientCert.Subject}");
LogDebug($"Extracted Common Name: {commonName}");
// Configure custom SASL EXTERNAL profile that sends only the CN value
factory.SASL.Profile = new CustomSaslExternalProfile(commonName);
}
catch (Exception ex)
{
LogError($"Error loading certificates: {ex.Message}");
throw;
}
return factory;
}
private static bool ValidateCertificate(X509Certificate2? serverCert, X509Certificate2 caCert)
{
if (serverCert == null || caCert == null)
return false;
var chain = new X509Chain();
chain.ChainPolicy.ExtraStore.Add(caCert);
chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
return chain.Build(serverCert);
}
private static string ExtractCommonName(string subjectDn)
{
// Extract CN value from Distinguished Name (e.g., "CN=XX99999, O=Company" -> "XX99999")
var parts = subjectDn.Split(',');
foreach (var part in parts)
{
var trimmed = part.Trim();
if (trimmed.StartsWith("CN="))
{
return trimmed.Substring(3);
}
}
return subjectDn; // Fallback to full DN if CN not found
}
// Custom SASL EXTERNAL profile that sends only the CN value
public class CustomSaslExternalProfile : SaslProfile
{
private readonly string identity;
public CustomSaslExternalProfile(string identity) : base(new Symbol("EXTERNAL"))
{
this.identity = identity;
}
protected override DescribedList GetStartCommand(string hostname)
{
return new SaslInit()
{
Mechanism = "EXTERNAL",
InitialResponse = Encoding.UTF8.GetBytes(identity)
};
}
protected override DescribedList OnCommand(DescribedList command)
{
// For EXTERNAL, we should only need the initial response
return null!;
}
protected override ITransport UpgradeTransport(ITransport transport)
{
// No transport upgrade needed for EXTERNAL
return transport;
}
}
private static async Task AmqpConnectAndPublishAsync(DeliveryEndpoint endpoint)
{
var factory = CreateConnectionFactory();
// Extract CN from certificate for user identity
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
var commonName = ExtractCommonName(clientCert.Subject);
// Create connection with proper user identity
var address = new Address($"amqps://{endpoint.Host}:{endpoint.Port}");
var connection = await factory.CreateAsync(address);
var session = new Session(connection);
var sender = new SenderLink(session, "sender-link", endpoint.Target);
var messageProperties = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(MESSAGE_APPLICATION_PROPERTIES_JSON);
LogDebug("Container reactor started");
int messageCount = 0;
// Send messages continuously
while (true)
{
// Increment message counter
messageCount++;
// Create dynamic message content with counter and timestamp
var bodyText = $"Hello World! Message #{messageCount} at {DateTime.Now:HH:mm:ss}";
var bodyBinary = Encoding.UTF8.GetBytes(bodyText);
var message = new Message()
{
BodySection = new Data() { Binary = bodyBinary },
ApplicationProperties = new ApplicationProperties()
};
foreach (var prop in messageProperties)
{
// Convert JsonElement to proper .NET types for AMQP
object value = prop.Value.ValueKind switch
{
JsonValueKind.String => prop.Value.GetString(),
JsonValueKind.Number => prop.Value.GetInt32(),
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.Null => null,
_ => prop.Value.ToString()
};
message.ApplicationProperties[prop.Key] = value;
}
// Format properties in sorted order for consistent logging
var messagePropsDict = JsonSerializer.Deserialize<Dictionary<string, JsonElement>>(MESSAGE_APPLICATION_PROPERTIES_JSON);
var sortedPropsJson = JsonSerializer.Serialize(messagePropsDict, new JsonSerializerOptions { WriteIndented = false });
LogInfo($"Sending message: body='{bodyText}', properties={sortedPropsJson}");
await sender.SendAsync(message);
// Wait before sending the next message
await Task.Delay(1000);
}
await sender.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
func amqpCreateTLSConfig() (*tls.Config, error) {
// Load client certificate and key
cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA certificate
caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
if err != nil {
return nil, fmt.Errorf("failed to read CA certificate: %v", err)
}
// Create certificate pool with CA
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
MinVersion: tls.VersionTLS13,
}
return tlsConfig, nil
}
type Sender struct {
client *amqp.Conn
session *amqp.Session
sender *amqp.Sender
endpoint map[string]string
messageCount int
}
func (s *Sender) connect() error {
// Step 1: Create TLS configuration
tlsConfig, err := amqpCreateTLSConfig()
if err != nil {
return err
}
// Create AMQP URL
amqpURL := fmt.Sprintf("amqps://%s:%s", s.endpoint["host"], s.endpoint["port"])
log.Printf("Connecting to %s", amqpURL)
// Step 2: Connect with TLS
opts := &amqp.ConnOptions{
TLSConfig: tlsConfig,
SASLType: amqp.SASLTypeExternal(""),
IdleTimeout: 0,
MaxFrameSize: 65536,
ContainerID: "",
HostName: s.endpoint["host"],
Properties: nil,
}
ctx := context.Background()
client, err := amqp.Dial(ctx, amqpURL, opts)
if err != nil {
return fmt.Errorf("failed to connect: %v", err)
}
s.client = client
// Create session
session, err := client.NewSession(ctx, nil)
if err != nil {
return fmt.Errorf("failed to create session: %v", err)
}
s.session = session
// Step 3: Create sender link using the target address
sender, err := session.NewSender(ctx, s.endpoint["target"], nil)
if err != nil {
return fmt.Errorf("failed to create sender: %v", err)
}
s.sender = sender
log.Println("Container reactor started")
return nil
}
func (s *Sender) sendMessage() error {
// Parse message properties from JSON
var properties map[string]interface{}
err := json.Unmarshal([]byte(MESSAGE_APPLICATION_PROPERTIES_JSON), &properties)
if err != nil {
return fmt.Errorf("failed to parse message properties: %v", err)
}
// Increment message counter
s.messageCount++
// Create dynamic message content with counter and timestamp
bodyText := fmt.Sprintf("Hello World! Message #%d at %s", s.messageCount, time.Now().Format("15:04:05"))
// Create message
msg := &amqp.Message{
Data: [][]byte{[]byte(bodyText)},
ApplicationProperties: properties,
}
ctx := context.Background()
// Format message for logging with sorted properties
// Create a map to ensure consistent ordering
propsMap := make(map[string]interface{})
for k, v := range msg.ApplicationProperties {
propsMap[k] = v
}
// Marshal with sorted keys
propsJSON, _ := json.Marshal(propsMap)
log.Printf("Sending message: body='%s', properties=%s", bodyText, string(propsJSON))
// Send message
err = s.sender.Send(ctx, msg, nil)
if err != nil {
return fmt.Errorf("failed to send message: %v", err)
}
return nil
}
func (s *Sender) run() {
for {
err := s.sendMessage()
if err != nil {
log.Printf("Error sending message: %v", err)
}
time.Sleep(1 * time.Second)
}
}
func (s *Sender) close() {
if s.sender != nil {
s.sender.Close(context.Background())
}
if s.session != nil {
s.session.Close(context.Background())
}
if s.client != nil {
s.client.Close()
}
}
func amqpConnectAndPublish(endpoint map[string]string) error {
sender := &Sender{
endpoint: endpoint,
}
err := sender.connect()
if err != nil {
return err
}
defer sender.close()
// Run sender in continuous loop
sender.run()
return nil
}
private static class SenderHandler extends BaseHandler {
private final Map<String, String> endpoint;
private final AtomicInteger messageCount = new AtomicInteger(0);
private Sender sender;
private final SimpleDateFormat timeFormat = new SimpleDateFormat("HH:mm:ss");
private SSLContext sslContext;
public SenderHandler(Map<String, String> endpoint) {
this.endpoint = endpoint;
}
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
@Override
public void onConnectionInit(Event event) {
logger.fine("Connection initialized");
Connection connection = event.getConnection();
connection.setHostname(endpoint.get("host"));
connection.setContainer("java-delivery-direct-example");
connection.open();
}
@Override
public void onConnectionBound(Event event) {
logger.fine("Connection bound, configuring transport");
Transport transport = event.getTransport();
if (sslContext != null) {
// Configure SASL EXTERNAL
Sasl sasl = transport.sasl();
sasl.setMechanisms("EXTERNAL");
// Configure SSL
SslDomain sslDomain = Proton.sslDomain();
sslDomain.init(SslDomain.Mode.CLIENT);
sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
sslDomain.setSslContext(sslContext);
transport.ssl(sslDomain);
}
}
@Override
public void onConnectionRemoteOpen(Event event) {
logger.fine("Connection opened");
Connection connection = event.getConnection();
Session session = connection.session();
session.open();
}
@Override
public void onSessionRemoteOpen(Event event) {
logger.fine("Session opened");
Session session = event.getSession();
Target target = new Target();
String targetAddress = endpoint.get("target");
target.setAddress(targetAddress);
sender = session.sender(targetAddress);
sender.setTarget(target);
Source source = new Source();
sender.setSource(source);
sender.open();
}
@Override
public void onLinkRemoteClose(Event event) {
Link link = event.getLink();
logger.severe("Link remote close - State: " + link.getRemoteState());
if (link.getRemoteCondition() != null) {
logger.severe("Condition: " + link.getRemoteCondition().getCondition());
logger.severe("Description: " + link.getRemoteCondition().getDescription());
}
}
@Override
public void onLinkRemoteOpen(Event event) {
logger.fine("Sender link opened, ready to send messages");
if (event.getLink() instanceof Sender && event.getSender().getCredit() > 0) {
sendMessage();
}
}
@Override
public void onDelivery(Event event) {
Delivery delivery = event.getDelivery();
if (delivery.getRemoteState() != null) {
delivery.settle();
// Schedule next message after 1 second
event.getReactor().schedule(1000, this);
}
}
@Override
public void onTimerTask(Event event) {
if (sender != null && sender.getCredit() > 0) {
sendMessage();
}
}
private void sendMessage() {
try {
// Increment message counter
int count = messageCount.incrementAndGet();
// Create dynamic message content with counter and timestamp
String bodyText = String.format("Hello World! Message #%d at %s", count, timeFormat.format(new Date()));
// Create message
Message message = Message.Factory.create();
message.setBody(new Data(new Binary(bodyText.getBytes(StandardCharsets.UTF_8))));
// Parse and set application properties
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> properties = mapper.readValue(MESSAGE_APPLICATION_PROPERTIES_JSON,
new TypeReference<Map<String, Object>>() {
});
message.setApplicationProperties(new ApplicationProperties(properties));
// Format properties for logging
String sortedProperties = mapper.writeValueAsString(new TreeMap<>(properties));
logger.info(String.format("Sending message: body='%s', properties=%s", bodyText, sortedProperties));
// Send message
byte[] encodedMessage = new byte[1024];
int encodedSize = message.encode(encodedMessage, 0, encodedMessage.length);
Delivery delivery = sender.delivery(new byte[0]);
sender.send(encodedMessage, 0, encodedSize);
sender.advance();
} catch (Exception e) {
logger.log(Level.WARNING, "Error sending message", e);
e.printStackTrace();
}
}
@Override
public void onTransportError(Event event) {
logger.log(Level.SEVERE, "Transport error: " + event.getTransport().getCondition());
}
}
private static SSLContext createSSLContext() throws Exception {
// Add BouncyCastle provider
Security.addProvider(new BouncyCastleProvider());
// Create SSL context
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
// Load client certificate and key
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null);
// Parse certificates and key using BouncyCastle
List<java.security.cert.Certificate> certChain = new ArrayList<>();
PrivateKey privateKey = null;
try (PEMParser pemParser = new PEMParser(new FileReader(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM))) {
Object object;
JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter().setProvider("BC");
while ((object = pemParser.readObject()) != null) {
if (object instanceof X509CertificateHolder) {
certChain.add(certConverter.getCertificate((X509CertificateHolder) object));
} else if (object instanceof PEMKeyPair) {
privateKey = keyConverter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo());
}
}
}
// Add certificate chain and key to keystore
if (privateKey != null && !certChain.isEmpty()) {
keyStore.setKeyEntry("client", privateKey, new char[0], certChain.toArray(new java.security.cert.Certificate[0]));
} else {
throw new IllegalStateException("Failed to load client certificate and key");
}
// Initialize key manager
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, new char[0]);
// Load CA certificate
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
try (FileInputStream fis = new FileInputStream(CA_CERTIFICATE_PEM)) {
java.security.cert.Certificate caCert = cf.generateCertificate(fis);
trustStore.setCertificateEntry("ca", caCert);
}
// Initialize trust manager
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
// Initialize SSL context
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
return sslContext;
}
private static void amqpConnectAndPublish(Map<String, String> endpoint) throws Exception {
// Configure SSL
SSLContext sslContext = createSSLContext();
// Create handler
SenderHandler handler = new SenderHandler(endpoint);
handler.setSslContext(sslContext);
// Create reactor
Reactor reactor = Proton.reactor(handler);
// Connect to host with SSL and SASL configuration
String host = endpoint.get("host");
int port = Integer.parseInt(endpoint.get("port"));
// Use reactor's connection method with proper SSL/SASL setup
reactor.connectionToHost(host, port, handler);
// Run reactor
reactor.run();
}
// No separate SSL config function needed - rhea expects these in connection options
function sendMessage(sender) {
// Increment message counter
messageCount++;
// Create dynamic message content with counter and timestamp
const now = new Date();
const timeString = now.toTimeString().split(' ')[0]; // HH:MM:SS format
const bodyText = `Hello World! Message #${messageCount} at ${timeString}`;
const bodyBinary = Buffer.from(bodyText, 'utf8');
// Parse application properties
const properties = JSON.parse(MESSAGE_APPLICATION_PROPERTIES_JSON);
// Create message with explicit data section for binary body
const message = {
// Use data_section to ensure body is sent as AMQP Data type (binary)
body: rhea.message.data_section(bodyBinary),
application_properties: properties
};
// Format properties in sorted order for consistent logging
const sortedProperties = JSON.stringify(properties, Object.keys(properties).sort());
console.log(`${new Date().toISOString()} INFO Sending message: body='${bodyText}', properties=${sortedProperties}`);
sender.send(message);
}
function amqpConnectAndPublish(endpoint) {
const container = rhea.create_container();
// Connection options with TLS configuration
const connectionOptions = {
host: endpoint.host,
port: parseInt(endpoint.port),
transport: 'tls',
// Client certificate authentication
key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
// CA certificate
ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
// Connection settings
reconnect: false,
initial_reconnect_delay: 0,
max_reconnect_delay: 0,
idle_time_out: 0,
container_id: 'javascript-delivery-direct-example',
// Enable SASL EXTERNAL for client certificate authentication
enable_sasl_external: true
};
let sender = null;
let sendTimer = null;
container.on('connection_open', function (context) {
console.log(`${new Date().toISOString()} DEBUG Connection opened`);
console.log(`${new Date().toISOString()} DEBUG Creating sender for target: ${endpoint.target}`);
// Create sender
sender = context.connection.open_sender({
target: {
address: endpoint.target
}
});
});
container.on('sendable', function (context) {
console.log(`${new Date().toISOString()} DEBUG Sender link opened, ready to send messages`);
// Send first message immediately
sendMessage(context.sender);
// Schedule subsequent messages every 1 second
sendTimer = setInterval(() => {
if (context.sender.sendable()) {
sendMessage(context.sender);
}
}, 1000);
});
container.on('accepted', function (context) {
// Message was accepted by the broker
});
container.on('rejected', function (context) {
console.log(`${new Date().toISOString()} WARNING Message rejected:`, context.delivery.remote.error);
});
container.on('connection_close', function (context) {
console.log(`${new Date().toISOString()} INFO Connection closed`);
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Connection close error:`, context.connection.error);
}
if (sendTimer) {
clearInterval(sendTimer);
}
});
container.on('connection_error', function (context) {
const error = context.connection.error || context.error;
console.log(`${new Date().toISOString()} SEVERE Connection error:`, error);
});
container.on('error', function (context) {
console.log(`${new Date().toISOString()} SEVERE Error:`, context.error);
});
container.on('disconnected', function (context) {
console.log(`${new Date().toISOString()} SEVERE Disconnected:`, context.error || 'Unknown error');
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Disconnection details:`, context.connection.error);
}
});
// Connect
console.log(`${new Date().toISOString()} INFO Connecting to ${endpoint.host}:${endpoint.port}`);
container.connect(connectionOptions);
}
Explanation
AMQP 1.0 client implementation with SSL/TLS configuration using:
- Python: python-qpid-proton library
- .NET: AMQPNetLite library
- Go: Azure/go-amqp library
- Java: Apache Qpid Proton-J library
- JavaScript: rhea library
Direct connection and publishing
def direct_publish():
try:
# Create endpoint from environment variables
endpoint = {
"host": ENDPOINT_HOST,
"port": ENDPOINT_PORT,
"target": ENDPOINT_TARGET
}
logging.info("Using pre-known endpoint %s" % endpoint)
amqp_connect_and_publish(endpoint)
except Exception as e:
logging.warning("An exception occurred while running direct_publish: %s" % e)
private static async Task DirectPublishAsync()
{
try
{
// Create endpoint from environment variables
var endpoint = new DeliveryEndpoint
{
Host = ENDPOINT_HOST,
Port = int.Parse(ENDPOINT_PORT),
Target = ENDPOINT_TARGET
};
LogInfo($"Using pre-known endpoint {JsonSerializer.Serialize(endpoint)}");
await AmqpConnectAndPublishAsync(endpoint);
}
catch (Exception e)
{
LogError($"An exception occurred while running DirectPublish: {e.Message}");
}
}
func directPublish() {
// Create endpoint from environment variables
endpoint := map[string]string{
"host": ENDPOINT_HOST,
"port": ENDPOINT_PORT,
"target": ENDPOINT_TARGET,
}
log.Printf("Using pre-known endpoint %v", endpoint)
err := amqpConnectAndPublish(endpoint)
if err != nil {
log.Printf("An exception occurred while running direct_publish: %v", err)
}
}
private static void directPublish() {
try {
// Create endpoint from environment variables
Map<String, String> endpoint = new HashMap<>();
endpoint.put("host", ENDPOINT_HOST);
endpoint.put("port", ENDPOINT_PORT);
endpoint.put("target", ENDPOINT_TARGET);
logger.info("Using pre-known endpoint " + endpoint);
amqpConnectAndPublish(endpoint);
} catch (Exception e) {
logger.log(Level.SEVERE, "An exception occurred while running direct_publish", e);
}
}
function directPublish() {
try {
// Create endpoint from environment variables
const endpoint = {
host: ENDPOINT_HOST,
port: ENDPOINT_PORT,
target: ENDPOINT_TARGET
};
console.log(`${new Date().toISOString()} INFO Using pre-known endpoint ${JSON.stringify(endpoint)}`);
amqpConnectAndPublish(endpoint);
} catch (error) {
console.log(`${new Date().toISOString()} WARNING An exception occurred while running direct_publish: ${error.message}`);
}
}
Explanation
Direct connection to AMQP endpoint using pre-configured host, port, and target address
Full examples
Language | Location | Description |
---|---|---|
Python | examples/delivery-direct/python | Direct AMQP connection with known endpoint |
.NET | examples/delivery-direct/dotnet | Direct AMQP connection with known endpoint |
Go | examples/delivery-direct/go | Direct AMQP connection with known endpoint |
Java | examples/delivery-direct/java | Direct AMQP connection with known endpoint |
JavaScript | examples/delivery-direct/javascript | Direct AMQP connection with known endpoint |