Direct subscription connection with known endpoint
This example demonstrates how to connect directly to a known AMQP endpoint for receiving messages without using the Actor API to create a subscription. This is useful when you already have the endpoint information from a previous subscription creation or when working with persistent subscription endpoints.
Prerequisites
General Requirements
- Valid client certificate and private key in PEM format
- CA certificate in PEM format
- Known AMQP endpoint information (host, port, source address)
Language-Specific Requirements
- Python 3.x
- python-qpid-proton library
- .NET 6.0 or later
- AMQPNetLite NuGet package
- Go 1.18 or later
- github.com/Azure/go-amqp package
- Java 11 or later
- Apache Qpid Proton-J library
- Node.js 16 or later
- rhea library
Environment Variables
Variable | Description | Example |
---|---|---|
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM |
Path to client certificate and private key in PEM format | /path/to/client-cert-and-key.pem |
CA_CERTIFICATE_PEM |
Path to CA certificate in PEM format | /path/to/ca-cert.pem |
ENDPOINT_HOST |
AMQP endpoint hostname | amqp.example.com |
ENDPOINT_PORT |
AMQP endpoint port | 5671 |
ENDPOINT_SOURCE |
AMQP source address for subscription | subscription-source-address |
Configuration
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM=os.environ.get("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM=os.environ.get("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
# Pre-known endpoint information
ENDPOINT_HOST=os.environ.get("ENDPOINT_HOST", "amqp_endpoint_host")
ENDPOINT_PORT=os.environ.get("ENDPOINT_PORT", "amqp_endpoint_port")
ENDPOINT_SOURCE=os.environ.get("ENDPOINT_SOURCE", "amqp_endpoint_source_address")
private static readonly string ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = Environment.GetEnvironmentVariable("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM") ?? "pem_with_x509_certificate_chain_and_private_key";
private static readonly string CA_CERTIFICATE_PEM = Environment.GetEnvironmentVariable("CA_CERTIFICATE_PEM") ?? "pem_with_x509_certificate";
// Pre-known endpoint information
private static readonly string ENDPOINT_HOST = Environment.GetEnvironmentVariable("ENDPOINT_HOST") ?? "amqp_endpoint_host";
private static readonly string ENDPOINT_PORT = Environment.GetEnvironmentVariable("ENDPOINT_PORT") ?? "amqp_endpoint_port";
private static readonly string ENDPOINT_SOURCE = Environment.GetEnvironmentVariable("ENDPOINT_SOURCE") ?? "amqp_endpoint_source_address";
var (
ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key")
CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate")
// Pre-known endpoint information
ENDPOINT_HOST = getEnv("ENDPOINT_HOST", "amqp_endpoint_host")
ENDPOINT_PORT = getEnv("ENDPOINT_PORT", "amqp_endpoint_port")
ENDPOINT_SOURCE = getEnv("ENDPOINT_SOURCE", "amqp_endpoint_source_address")
)
func getEnv(key, defaultValue string) string {
if value := os.Getenv(key); value != "" {
return value
}
return defaultValue
}
private static final String ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = getEnv("ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM", "pem_with_x509_certificate_chain_and_private_key");
private static final String CA_CERTIFICATE_PEM = getEnv("CA_CERTIFICATE_PEM", "pem_with_x509_certificate");
// Pre-known endpoint information
private static final String ENDPOINT_HOST = getEnv("ENDPOINT_HOST", "amqp_endpoint_host");
private static final String ENDPOINT_PORT = getEnv("ENDPOINT_PORT", "amqp_endpoint_port");
private static final String ENDPOINT_SOURCE = getEnv("ENDPOINT_SOURCE", "amqp_endpoint_source_address");
private static String getEnv(String key, String defaultValue) {
String value = System.getenv(key);
return value != null ? value : defaultValue;
}
const ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM = process.env.ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM || 'pem_with_x509_certificate_chain_and_private_key';
const CA_CERTIFICATE_PEM = process.env.CA_CERTIFICATE_PEM || 'pem_with_x509_certificate';
// Pre-known endpoint information
const ENDPOINT_HOST = process.env.ENDPOINT_HOST || 'amqp_endpoint_host';
const ENDPOINT_PORT = process.env.ENDPOINT_PORT || 'amqp_endpoint_port';
const ENDPOINT_SOURCE = process.env.ENDPOINT_SOURCE || 'amqp_endpoint_source_address';
Explanation
Configuration by environment variables for certificates and pre-known endpoint information
AMQP 1.0 Client
def amqp_create_ssl_config():
ssl_config = SSLDomain(SSLDomain.MODE_CLIENT)
ssl_config.set_peer_authentication(SSLDomain.ANONYMOUS_PEER)
ssl_config.set_credentials(cert_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, key_file=ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, password=None)
ssl_config.set_trusted_ca_db(CA_CERTIFICATE_PEM)
return ssl_config
class Receiver(MessagingHandler):
def __init__(self, endpoint):
super(Receiver, self).__init__()
self.__endpoint = endpoint
def on_start(self, event):
logging.debug("Container reactor started")
container = event.container
endpoint = self.__endpoint
# Step 1: connect
ssl_config = amqp_create_ssl_config()
amqp_url = "amqps://%s:%s" % (endpoint["host"], endpoint["port"])
connection = container.connect(amqp_url, ssl_domain = ssl_config, reconnect = False, heartbeat = 5)
# Step 2: create a receiving link using the source address of the endpoint
container.create_receiver(connection, endpoint["source"])
def on_message(self, event):
# Decode binary body as UTF-8
body_binary = event.message.body
if isinstance(body_binary, bytes):
body_text = body_binary.decode('utf-8')
elif hasattr(body_binary, 'tobytes'):
# Handle memory view objects
body_text = body_binary.tobytes().decode('utf-8')
else:
body_text = str(body_binary)
# Format application properties in sorted order
app_props = {}
if hasattr(event.message, 'properties') and event.message.properties:
app_props = event.message.properties
elif hasattr(event.message, 'application_properties') and event.message.application_properties:
app_props = event.message.application_properties
sorted_props_json = json.dumps(app_props, sort_keys=True)
logging.info("Message received: body='%s', properties=%s", body_text, sorted_props_json)
def amqp_connect_and_listen(endpoint):
receiver = Receiver(endpoint)
container = Container(receiver)
thread = threading.Thread(name = "AMQPClient", target = container.run, daemon = True)
thread.start()
while thread.is_alive():
time.sleep(1)
private static ConnectionFactory CreateConnectionFactory()
{
var factory = new ConnectionFactory();
try
{
// Configure SSL/TLS with client certificate for SASL EXTERNAL
// Read the combined PEM file content
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
factory.SSL.ClientCertificates.Add(clientCert);
// Enable SSL/TLS
factory.SSL.Protocols = System.Security.Authentication.SslProtocols.Tls13;
// Load CA certificate for validation
var caCertPem = File.ReadAllText(CA_CERTIFICATE_PEM);
var caCert = X509Certificate2.CreateFromPem(caCertPem);
factory.SSL.RemoteCertificateValidationCallback = (sender, cert, chain, errors) => {
// Validate against CA certificate
return ValidateCertificate(cert as X509Certificate2, caCert);
};
// Extract common name for SASL EXTERNAL
var commonName = ExtractCommonName(clientCert.Subject);
LogDebug($"Certificate Subject: {clientCert.Subject}");
LogDebug($"Extracted Common Name: {commonName}");
// Configure custom SASL EXTERNAL profile that sends only the CN value
factory.SASL.Profile = new CustomSaslExternalProfile(commonName);
}
catch (Exception ex)
{
LogError($"Error loading certificates: {ex.Message}");
throw;
}
return factory;
}
private static bool ValidateCertificate(X509Certificate2? serverCert, X509Certificate2 caCert)
{
if (serverCert == null || caCert == null)
return false;
var chain = new X509Chain();
chain.ChainPolicy.ExtraStore.Add(caCert);
chain.ChainPolicy.RevocationMode = X509RevocationMode.NoCheck;
chain.ChainPolicy.VerificationFlags = X509VerificationFlags.AllowUnknownCertificateAuthority;
return chain.Build(serverCert);
}
private static string ExtractCommonName(string subjectDn)
{
// Extract CN value from Distinguished Name (e.g., "CN=XX99999, O=Company" -> "XX99999")
var parts = subjectDn.Split(',');
foreach (var part in parts)
{
var trimmed = part.Trim();
if (trimmed.StartsWith("CN="))
{
return trimmed.Substring(3);
}
}
return subjectDn; // Fallback to full DN if CN not found
}
// Custom SASL EXTERNAL profile that sends only the CN value
public class CustomSaslExternalProfile : SaslProfile
{
private readonly string identity;
public CustomSaslExternalProfile(string identity) : base(new Symbol("EXTERNAL"))
{
this.identity = identity;
}
protected override DescribedList GetStartCommand(string hostname)
{
return new SaslInit()
{
Mechanism = "EXTERNAL",
InitialResponse = Encoding.UTF8.GetBytes(identity)
};
}
protected override DescribedList OnCommand(DescribedList command)
{
// For EXTERNAL, we should only need the initial response
return null!;
}
protected override ITransport UpgradeTransport(ITransport transport)
{
// No transport upgrade needed for EXTERNAL
return transport;
}
}
private static void PrintMessageDetails(Message message)
{
// Decode binary body as UTF-8
var bodyText = "";
if (message.Body is byte[] bodyBytes)
{
bodyText = Encoding.UTF8.GetString(bodyBytes);
}
else
{
bodyText = message.Body?.ToString() ?? "";
}
// Format application properties as JSON in sorted order
var appPropsJson = "{}";
if (message.ApplicationProperties != null && message.ApplicationProperties.Map != null)
{
var propsDict = new Dictionary<string, object>();
foreach (var kvp in message.ApplicationProperties.Map)
{
propsDict[kvp.Key.ToString()] = kvp.Value;
}
// Sort keys for consistent output
var sortedProps = propsDict.OrderBy(x => x.Key).ToDictionary(x => x.Key, x => x.Value);
appPropsJson = JsonSerializer.Serialize(sortedProps, new JsonSerializerOptions { WriteIndented = false });
}
LogInfo($"Message received: body='{bodyText}', properties={appPropsJson}");
}
private static async Task AmqpConnectAndListenAsync(SubscriptionEndpoint endpoint)
{
var factory = CreateConnectionFactory();
// Extract CN from certificate for user identity
var certAndKeyPem = File.ReadAllText(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM);
var clientCert = X509Certificate2.CreateFromPem(certAndKeyPem, certAndKeyPem);
var commonName = ExtractCommonName(clientCert.Subject);
// Create connection with proper user identity
var address = new Address($"amqps://{endpoint.Host}:{endpoint.Port}");
var connection = await factory.CreateAsync(address);
var session = new Session(connection);
var receiver = new ReceiverLink(session, "receiver-link", endpoint.Source);
LogInfo("Listening for messages. Press Ctrl+C to stop.");
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (sender, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while (!cts.Token.IsCancellationRequested)
{
var message = await receiver.ReceiveAsync(TimeSpan.FromSeconds(1));
if (message != null)
{
PrintMessageDetails(message);
receiver.Accept(message);
}
}
}
catch (OperationCanceledException)
{
LogInfo("Stopping message listener...");
}
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
func amqpCreateTLSConfig() (*tls.Config, error) {
// Load client certificate and key
cert, err := tls.LoadX509KeyPair(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM, ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA certificate
caCert, err := os.ReadFile(CA_CERTIFICATE_PEM)
if err != nil {
return nil, fmt.Errorf("failed to read CA certificate: %v", err)
}
// Create certificate pool with CA
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
InsecureSkipVerify: false,
MinVersion: tls.VersionTLS13,
}
return tlsConfig, nil
}
type Receiver struct {
client *amqp.Conn
session *amqp.Session
receiver *amqp.Receiver
endpoint map[string]string
}
func (r *Receiver) connect() error {
// Step 1: Create TLS configuration
tlsConfig, err := amqpCreateTLSConfig()
if err != nil {
return err
}
// Create AMQP URL
amqpURL := fmt.Sprintf("amqps://%s:%s", r.endpoint["host"], r.endpoint["port"])
log.Printf("Connecting to %s", amqpURL)
// Step 2: Connect with TLS
opts := &amqp.ConnOptions{
TLSConfig: tlsConfig,
SASLType: amqp.SASLTypeExternal(""),
IdleTimeout: 0,
MaxFrameSize: 65536,
ContainerID: "",
HostName: r.endpoint["host"],
Properties: nil,
}
ctx := context.Background()
client, err := amqp.Dial(ctx, amqpURL, opts)
if err != nil {
return fmt.Errorf("failed to connect: %v", err)
}
r.client = client
// Create session
session, err := client.NewSession(ctx, nil)
if err != nil {
return fmt.Errorf("failed to create session: %v", err)
}
r.session = session
// Step 3: Create receiver link using the source address
receiver, err := session.NewReceiver(ctx, r.endpoint["source"], nil)
if err != nil {
return fmt.Errorf("failed to create receiver: %v", err)
}
r.receiver = receiver
log.Println("Container reactor started")
return nil
}
func (r *Receiver) receiveMessages() {
ctx := context.Background()
for {
// Receive message with timeout
msg, err := r.receiver.Receive(ctx, nil)
if err != nil {
log.Printf("Error receiving message: %v", err)
continue
}
// Decode binary body as UTF-8
var bodyText string
if len(msg.Data) > 0 && len(msg.Data[0]) > 0 {
// Decode binary data as UTF-8 string - this handles the binary payload
bodyText = string(msg.Data[0])
} else if msg.Value != nil {
// Handle other message value types (string, bytes, etc.)
switch v := msg.Value.(type) {
case []byte:
bodyText = string(v)
case string:
bodyText = v
default:
bodyText = fmt.Sprintf("%v", v)
}
} else {
bodyText = ""
}
// Format application properties in sorted order
var propsMap map[string]interface{}
if msg.ApplicationProperties != nil {
propsMap = make(map[string]interface{})
for k, v := range msg.ApplicationProperties {
propsMap[k] = v
}
} else {
propsMap = make(map[string]interface{})
}
// Marshal with sorted keys
propsJSON, _ := json.Marshal(propsMap)
log.Printf("Message received: body='%s', properties=%s", bodyText, string(propsJSON))
// Accept the message
err = r.receiver.AcceptMessage(ctx, msg)
if err != nil {
log.Printf("Error accepting message: %v", err)
}
}
}
func (r *Receiver) close() {
if r.receiver != nil {
r.receiver.Close(context.Background())
}
if r.session != nil {
r.session.Close(context.Background())
}
if r.client != nil {
r.client.Close()
}
}
func amqpConnectAndListen(endpoint map[string]string) error {
receiver := &Receiver{
endpoint: endpoint,
}
err := receiver.connect()
if err != nil {
return err
}
defer receiver.close()
// Start receiving messages
receiver.receiveMessages()
return nil
}
private static class ReceiverHandler extends BaseHandler {
private final Map<String, String> endpoint;
private Receiver receiver;
private SSLContext sslContext;
public ReceiverHandler(Map<String, String> endpoint) {
this.endpoint = endpoint;
}
public void setSslContext(SSLContext sslContext) {
this.sslContext = sslContext;
}
@Override
public void onConnectionInit(Event event) {
logger.fine("Connection initialized");
Connection connection = event.getConnection();
connection.setHostname(endpoint.get("host"));
connection.setContainer("java-subscription-direct-example");
connection.open();
}
@Override
public void onConnectionBound(Event event) {
logger.fine("Connection bound, configuring transport");
Transport transport = event.getTransport();
if (sslContext != null) {
// Configure SASL EXTERNAL
Sasl sasl = transport.sasl();
sasl.setMechanisms("EXTERNAL");
// Configure SSL
SslDomain sslDomain = Proton.sslDomain();
sslDomain.init(SslDomain.Mode.CLIENT);
sslDomain.setPeerAuthentication(SslDomain.VerifyMode.VERIFY_PEER);
sslDomain.setSslContext(sslContext);
transport.ssl(sslDomain);
}
}
@Override
public void onConnectionRemoteOpen(Event event) {
logger.fine("Connection opened");
Connection connection = event.getConnection();
Session session = connection.session();
session.open();
}
@Override
public void onSessionRemoteOpen(Event event) {
logger.fine("Session opened");
Session session = event.getSession();
Source source = new Source();
String sourceAddress = endpoint.get("source");
source.setAddress(sourceAddress);
receiver = session.receiver(sourceAddress);
receiver.setSource(source);
Target target = new Target();
receiver.setTarget(target);
receiver.open();
}
@Override
public void onLinkRemoteClose(Event event) {
Link link = event.getLink();
logger.severe("Link remote close - State: " + link.getRemoteState());
if (link.getRemoteCondition() != null) {
logger.severe("Condition: " + link.getRemoteCondition().getCondition());
logger.severe("Description: " + link.getRemoteCondition().getDescription());
}
}
@Override
public void onLinkRemoteOpen(Event event) {
logger.fine("Receiver link opened, ready to receive messages");
receiver.flow(100); // Grant credit to receive messages
}
@Override
public void onDelivery(Event event) {
Delivery delivery = event.getDelivery();
if (delivery.isReadable() && !delivery.isPartial()) {
try {
// Read message data
byte[] messageData = new byte[delivery.available()];
receiver.recv(messageData, 0, messageData.length);
// Decode message
Message message = Message.Factory.create();
message.decode(messageData, 0, messageData.length);
// Extract body
String bodyText = "";
if (message.getBody() instanceof Data) {
Data bodyData = (Data) message.getBody();
if (bodyData.getValue() instanceof Binary) {
Binary binary = (Binary) bodyData.getValue();
bodyText = new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8);
} else {
logger.warning("Unexpected body type: " + bodyData.getValue().getClass().getName());
bodyText = bodyData.getValue().toString(); // Fallback to string representation
}
} else {
logger.warning("Unexpected message body type: " + message.getBody().getClass().getName());
bodyText = message.getBody().toString(); // Fallback to string representation
}
// Extract and format application properties
Map<String, Object> appProps = new HashMap<>();
if (message.getApplicationProperties() != null && message.getApplicationProperties().getValue() != null) {
appProps = (Map<String, Object>) message.getApplicationProperties().getValue();
}
// Format properties in sorted order for consistent logging
ObjectMapper mapper = new ObjectMapper();
String sortedProperties = mapper.writeValueAsString(new TreeMap<>(appProps));
logger.info(String.format("Message received: body='%s', properties=%s", bodyText, sortedProperties));
// Settle the delivery
delivery.settle();
// Grant more credit for next message
receiver.flow(1);
} catch (Exception e) {
logger.log(Level.WARNING, "Error processing message", e);
delivery.settle();
receiver.flow(1);
}
}
}
@Override
public void onTransportError(Event event) {
logger.log(Level.SEVERE, "Transport error: " + event.getTransport().getCondition());
}
}
private static SSLContext createSSLContext() throws Exception {
// Add BouncyCastle provider
Security.addProvider(new BouncyCastleProvider());
// Create SSL context
SSLContext sslContext = SSLContext.getInstance("TLSv1.3");
// Load client certificate and key
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null);
// Parse certificates and key using BouncyCastle
List<java.security.cert.Certificate> certChain = new ArrayList<>();
PrivateKey privateKey = null;
try (PEMParser pemParser = new PEMParser(new FileReader(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM))) {
Object object;
JcaPEMKeyConverter keyConverter = new JcaPEMKeyConverter().setProvider("BC");
JcaX509CertificateConverter certConverter = new JcaX509CertificateConverter().setProvider("BC");
while ((object = pemParser.readObject()) != null) {
if (object instanceof X509CertificateHolder) {
certChain.add(certConverter.getCertificate((X509CertificateHolder) object));
} else if (object instanceof PEMKeyPair) {
privateKey = keyConverter.getPrivateKey(((PEMKeyPair) object).getPrivateKeyInfo());
}
}
}
// Add certificate chain and key to keystore
if (privateKey != null && !certChain.isEmpty()) {
keyStore.setKeyEntry("client", privateKey, new char[0], certChain.toArray(new java.security.cert.Certificate[0]));
} else {
throw new IllegalStateException("Failed to load client certificate and key");
}
// Initialize key manager
KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
kmf.init(keyStore, new char[0]);
// Load CA certificate
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null);
CertificateFactory cf = CertificateFactory.getInstance("X.509");
try (FileInputStream fis = new FileInputStream(CA_CERTIFICATE_PEM)) {
java.security.cert.Certificate caCert = cf.generateCertificate(fis);
trustStore.setCertificateEntry("ca", caCert);
}
// Initialize trust manager
TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
tmf.init(trustStore);
// Initialize SSL context
sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), new SecureRandom());
return sslContext;
}
private static void amqpConnectAndListen(Map<String, String> endpoint) throws Exception {
// Configure SSL
SSLContext sslContext = createSSLContext();
// Create handler
ReceiverHandler handler = new ReceiverHandler(endpoint);
handler.setSslContext(sslContext);
// Create reactor
Reactor reactor = Proton.reactor(handler);
// Connect to host with SSL and SASL configuration
String host = endpoint.get("host");
int port = Integer.parseInt(endpoint.get("port"));
// Use reactor's connection method with proper SSL/SASL setup
reactor.connectionToHost(host, port, handler);
// Run reactor
reactor.run();
}
function amqpConnectAndListen(endpoint) {
const container = rhea.create_container();
// Connection options with TLS configuration
const connectionOptions = {
host: endpoint.host,
port: parseInt(endpoint.port),
transport: 'tls',
// Client certificate authentication
key: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
cert: fs.readFileSync(ACTOR_CERTIFICATE_CHAIN_AND_KEY_PEM),
// CA certificate
ca: [fs.readFileSync(CA_CERTIFICATE_PEM)],
// Connection settings
reconnect: false,
initial_reconnect_delay: 0,
max_reconnect_delay: 0,
idle_time_out: 0,
container_id: 'javascript-subscription-direct-example',
// Enable SASL EXTERNAL for client certificate authentication
enable_sasl_external: true
};
let receiver = null;
container.on('connection_open', function (context) {
console.log(`${new Date().toISOString()} DEBUG Connection opened`);
console.log(`${new Date().toISOString()} DEBUG Creating receiver for source: ${endpoint.source}`);
// Create receiver
receiver = context.connection.open_receiver({
source: {
address: endpoint.source
}
});
});
container.on('receiver_open', function (context) {
console.log(`${new Date().toISOString()} DEBUG Receiver link opened, ready to receive messages`);
});
container.on('message', function (context) {
// Decode binary body as UTF-8
let bodyText = '';
if (context.message.body !== undefined) {
if (Buffer.isBuffer(context.message.body)) {
// Direct buffer case
bodyText = context.message.body.toString('utf8');
} else if (Array.isArray(context.message.body)) {
// Array of data sections - concatenate all buffers
for (const section of context.message.body) {
if (Buffer.isBuffer(section)) {
bodyText += section.toString('utf8');
}
}
} else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Section') {
// AMQP Section object with content buffer
if (Buffer.isBuffer(context.message.body.content)) {
bodyText = context.message.body.content.toString('utf8');
} else {
bodyText = String(context.message.body.content);
}
} else if (typeof context.message.body === 'object' && context.message.body.constructor && context.message.body.constructor.name === 'Buffer') {
// Sometimes Buffer objects appear as generic objects
bodyText = Buffer.from(context.message.body).toString('utf8');
} else {
// Fallback to string conversion
bodyText = String(context.message.body);
}
}
// Format application properties in sorted order
let appProps = {};
if (context.message.application_properties) {
appProps = context.message.application_properties;
}
const sortedPropsJson = JSON.stringify(appProps, Object.keys(appProps).sort());
console.log(`${new Date().toISOString()} INFO Message received: body='${bodyText}', properties=${sortedPropsJson}`);
// Accept the message
context.delivery.accept();
});
container.on('connection_close', function (context) {
console.log(`${new Date().toISOString()} INFO Connection closed`);
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Connection close error:`, context.connection.error);
}
});
container.on('connection_error', function (context) {
const error = context.connection.error || context.error;
console.log(`${new Date().toISOString()} SEVERE Connection error:`, error);
});
container.on('error', function (context) {
console.log(`${new Date().toISOString()} SEVERE Error:`, context.error);
});
container.on('disconnected', function (context) {
console.log(`${new Date().toISOString()} SEVERE Disconnected:`, context.error || 'Unknown error');
if (context.connection && context.connection.error) {
console.log(`${new Date().toISOString()} SEVERE Disconnection details:`, context.connection.error);
}
});
// Connect
console.log(`${new Date().toISOString()} INFO Connecting to ${endpoint.host}:${endpoint.port}`);
container.connect(connectionOptions);
}
Explanation
AMQP 1.0 client implementation with SSL/TLS configuration using:
- Python: python-qpid-proton library
- .NET: AMQPNetLite library
- Go: Azure/go-amqp library
- Java: Apache Qpid Proton-J library
- JavaScript: rhea library
Direct connection and subscription
def direct_subscribe():
try:
# Create endpoint from environment variables
endpoint = {
"host": ENDPOINT_HOST,
"port": ENDPOINT_PORT,
"source": ENDPOINT_SOURCE
}
logging.info("Using pre-known endpoint %s" % endpoint)
amqp_connect_and_listen(endpoint)
except Exception as e:
logging.warning("An exception occurred while running direct_subscribe: %s" % e)
private static async Task DirectSubscribeAsync()
{
try
{
// Create endpoint from environment variables
var endpoint = new SubscriptionEndpoint
{
Host = ENDPOINT_HOST,
Port = int.Parse(ENDPOINT_PORT),
Source = ENDPOINT_SOURCE
};
LogInfo($"Using pre-known endpoint {JsonSerializer.Serialize(endpoint)}");
await AmqpConnectAndListenAsync(endpoint);
}
catch (Exception e)
{
LogError($"An exception occurred while running DirectSubscribe: {e.Message}");
}
}
func directSubscribe() {
// Create endpoint from environment variables
endpoint := map[string]string{
"host": ENDPOINT_HOST,
"port": ENDPOINT_PORT,
"source": ENDPOINT_SOURCE,
}
log.Printf("Using pre-known endpoint %v", endpoint)
err := amqpConnectAndListen(endpoint)
if err != nil {
log.Printf("An exception occurred while running direct_subscribe: %v", err)
}
}
private static void directSubscribe() {
try {
// Create endpoint from environment variables
Map<String, String> endpoint = new HashMap<>();
endpoint.put("host", ENDPOINT_HOST);
endpoint.put("port", ENDPOINT_PORT);
endpoint.put("source", ENDPOINT_SOURCE);
logger.info("Using pre-known endpoint " + endpoint);
amqpConnectAndListen(endpoint);
} catch (Exception e) {
logger.log(Level.SEVERE, "An exception occurred while running direct_subscribe", e);
}
}
function directSubscribe() {
try {
// Create endpoint from environment variables
const endpoint = {
host: ENDPOINT_HOST,
port: ENDPOINT_PORT,
source: ENDPOINT_SOURCE
};
console.log(`${new Date().toISOString()} INFO Using pre-known endpoint ${JSON.stringify(endpoint)}`);
amqpConnectAndListen(endpoint);
} catch (error) {
console.log(`${new Date().toISOString()} WARNING An exception occurred while running direct_subscribe: ${error.message}`);
}
}
Explanation
Direct connection to AMQP endpoint using pre-configured host, port, and source address
Full examples
Language | Location | Description |
---|---|---|
Python | examples/subscription-direct/python | Direct AMQP connection with known endpoint |
.NET | examples/subscription-direct/dotnet | Direct AMQP connection with known endpoint |
Go | examples/subscription-direct/go | Direct AMQP connection with known endpoint |
Java | examples/subscription-direct/java | Direct AMQP connection with known endpoint |
JavaScript | examples/subscription-direct/javascript | Direct AMQP connection with known endpoint |