博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ (1) 消息的发送与接收
阅读量:6157 次
发布时间:2019-06-21

本文共 4508 字,大约阅读时间需要 15 分钟。

  hot3.png

本例子为简单的消息发送与接收,主机地址与端口均采用默认值。

如Send1 ,  声明Queue后消息被注入通道,(实际msg仍然先注入了默认的Exchange)

消息发送端:

package sunf.rabbitMQTest;import java.io.IOException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.MessageProperties;/** * MQ消息发送者测试1     *  * 对应 :SomeConsumerTest *  * 本例中信息被直接从channel流向queue。 * 信息会被所有消费该队列信息的客户---均分。 *  * @author SUNF * */public class Send1 {    public static void main(String[] args) throws IOException, Exception {        ConnectionFactory cf = new ConnectionFactory();        cf.setHost(RabbitMQConstants.VHOST_IP);        Connection conn = cf.newConnection(); //新连接        Channel channel = conn.createChannel();        //声明此队列并且持久化          channel.queueDeclare(RabbitMQConstants.QUEUE_NAME, true/*持久化*/, false, false, null);        for(int i=0;i<10;i++){            String message = "能量+" + i;            //持久化消息            channel.basicPublish("", RabbitMQConstants.QUEUE_NAME,                      MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());                        System.out.println("[消息已发送--------------->>" + message);        }                channel.close();        conn.close();            }}

运行结果: 

[消息已发送--------------->>能量+0[消息已发送--------------->>能量+1[消息已发送--------------->>能量+2[消息已发送--------------->>能量+3[消息已发送--------------->>能量+4[消息已发送--------------->>能量+5[消息已发送--------------->>能量+6[消息已发送--------------->>能量+7[消息已发送--------------->>能量+8[消息已发送--------------->>能量+9

接收端:

接收端模拟四个接收者同时接受一个Queue的信息,结果信息被均分到4个客户。

package sunf.rabbitMQTest;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.QueueingConsumer;/** * MQ单通道,多用户模拟 *  * @author SUNF * */public class ConsumerTest1 {        private static ConnectionFactory factory;    private static Connection connection;    public static void main(String[] args) {        //消费者1        ConsumerTest1.createConsumer("超人");        ConsumerTest1.createConsumer("葫芦娃");        ConsumerTest1.createConsumer("闪电侠");        ConsumerTest1.createConsumer("蜘蛛侠");    }    public static void createConsumer(final String consumerName){        new Thread(){            public void run(){                try {                    ConsumerTest1.receiveMessage(consumerName);                } catch (Exception e) {                    e.printStackTrace();                }            }        }.start();         }            /**     * @param consumerName 消费者名称     * @throws IOException     * @throws Exception     */    public static void receiveMessage(String consumerName) throws IOException, Exception{        if(factory == null){            synchronized(ConsumerTest1.class){                if(factory == null){                    factory = new ConnectionFactory();                    factory.setHost(RabbitMQConstants.VHOST_IP);                         connection = factory.newConnection();                }            }        }                Channel channel = connection.createChannel();        channel.queueDeclare(RabbitMQConstants.QUEUE_NAME, true, false, false, null);        System.out.println("["+consumerName+"] 准备完毕,可以接收消息------");        channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者                  QueueingConsumer consumer = new QueueingConsumer(channel);        channel.basicConsume(RabbitMQConstants.QUEUE_NAME, false, consumer);                while(true){            QueueingConsumer.Delivery dy = consumer.nextDelivery();            String msg = new String(dy.getBody());            System.out.println("("+ consumerName +") 收到的消息------->> " + msg);                        channel.basicAck(dy.getEnvelope().getDeliveryTag(), false);//下一个消息          }    }}

结果显示,队列中的10条信息被均分到不同的接收者上。

注意:由于线程开启速度、顺序和电脑环境原因,显示结果可能并不一定与实际相符,测试时尽可能先开启接收端,再启动发送端发布消息。

[超人] 准备完毕,可以接收消息------[葫芦娃] 准备完毕,可以接收消息------[蜘蛛侠] 准备完毕,可以接收消息------[闪电侠] 准备完毕,可以接收消息------(葫芦娃) 收到的消息------->> 能量+0(闪电侠) 收到的消息------->> 能量+1(蜘蛛侠) 收到的消息------->> 能量+2(超人) 收到的消息------->> 能量+3(葫芦娃) 收到的消息------->> 能量+4(闪电侠) 收到的消息------->> 能量+5(蜘蛛侠) 收到的消息------->> 能量+6(超人) 收到的消息------->> 能量+7(葫芦娃) 收到的消息------->> 能量+8(闪电侠) 收到的消息------->> 能量+9

 常量类

package sunf.rabbitMQTest;//public class RabbitMQConstants {    public static final String VHOST_IP = "127.0.0.1";    public static final String QUEUE_NAME = "hello2";    public static final String EXC_NAME = "java";}

 

转载于:https://my.oschina.net/dlam/blog/803268

你可能感兴趣的文章
Silverlight 如何手动打包xap
查看>>
禁用ViewState
查看>>
Android图片压缩(质量压缩和尺寸压缩)
查看>>
nilfs (a continuent snapshot file system) used with PostgreSQL
查看>>
【SICP练习】150 练习4.6
查看>>
HTTP缓存应用
查看>>
KubeEdge向左,K3S向右
查看>>
DTCC2013:基于网络监听数据库安全审计
查看>>
CCNA考试要点大搜集(二)
查看>>
ajax查询数据库时数据无法更新的问题
查看>>
Kickstart 无人职守安装,终于搞定了。
查看>>
linux开源万岁
查看>>
linux/CentOS6忘记root密码解决办法
查看>>
25个常用的Linux iptables规则
查看>>
集中管理系统--puppet
查看>>
分布式事务最终一致性常用方案
查看>>
Exchange 2013 PowerShell配置文件
查看>>
JavaAPI详解系列(1):String类(1)
查看>>
HTML条件注释判断IE<!--[if IE]><!--[if lt IE 9]>
查看>>
发布和逸出-构造过程中使this引用逸出
查看>>