一、生产者代码:
自行导入相关依赖包或相关依赖
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
//声明一个消息头部
Map<String,Object> header=new HashMap<>();
AMQP.BasicProperties.Builder b= new AMQP.BasicProperties.Builder();
header.put("charset","utf-8");
b.headers(header);
AMQP.BasicProperties bp=b.build();
//将消息发出去
channel.basicPublish("exchange_name","route_key",false,bp,"test3".getBytes());
二、消费者代码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("username");
factory.setPort(5672);//注意这里的端口与管理插件的端口不一样
factory.setPassword("pwd");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//声明一个dirent模式的交换机
channel.exchangeDeclare("exchange_name",BuiltinExchangeType.DIRECT,true);
//声明一个非持久化自动删除的队列
channel.queueDeclare("queue_name",false,false,true,null);//如果该队列不在被使用就删除他 zhe
//将绑定到改交换机
channel.queueBind("queue_name","exchange_name","route_key");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume("queue_name", true, consumer);