RabbitMq连接Java与Python

最近用Python写了一个爬虫项目,为了方便,用Java做了一个控制端,然后用RabbitMq将他们串起来

首先Java端的代码,生产者与消费者都采用的单例模式,其中消费者在tomcat启动时自动进行消费。话不多说,上代码

//消费者
public class ScrapyRabbitCon{
    
    
	//队列名
    private final static String QUEUE_NAME = "pythonjava";
    private static ScrapyRabbitCon rabbitmq;

    public static ScrapyRabbitCon getRabbit() {
    
    
        if(rabbitmq==null){
    
    
            try {
    
    
				rabbitmq = new ScrapyRabbitCon();
			} catch (IOException | TimeoutException e) {
    
    
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
        }
        return rabbitmq;
    }
    private ScrapyRabbitCon() throws IOException, TimeoutException {
    
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
    
    
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
    
    
                String message = new String(body, "UTF-8");
                //此处采用Swing弹窗显示接收到的消息
                JOptionPane.showMessageDialog(null, message, "ERROR", JOptionPane.ERROR_MESSAGE);
                System.out.println(message);
            }
        };
        channel.basicConsume(QUEUE_NAME,true, consumer);
    }

	//生产者
    public class ScrapyRabbitPro {
    
    
	//队列名
    private final static String QUEUE_NAME = "javapython";
    private Channel channel;
    private static ScrapyRabbitPro sendRabbit;
    public static ScrapyRabbitPro getSendRabbit(){
    
    
        if(sendRabbit==null){
    
    
            try {
    
    
				sendRabbit = new ScrapyRabbitPro();
			} catch (IOException | TimeoutException e) {
    
    
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
        }
        return sendRabbit;
    }
    private ScrapyRabbitPro() throws IOException, TimeoutException {
    
    
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    }
    public void send(JSONObject message){
    
    
        try {
    
    
			channel.basicPublish("", QUEUE_NAME, null, message.toString().getBytes("utf-8"));
		} catch (IOException e) {
    
    
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        System.out.println("Producer Send +'" + message + "'");
    }

以上是Java实现RabbitMq的代码,其中生产者封装了一个send方法,调用send方法可将对应的json格式消息发送,由Python端的消费者进行消费。

def callback(ch, method, properties, body):  # 定义一个回调函数,用来接收生产者发送的消息
    global TASKINFO, TASKSTATUS
    body = body.decode('utf-8')
    js = json.loads(body)
    taskid = js.get("taskid")
    TASKINFO = get_taskinfo(taskid)
    TASKSTATUS = get_taskstatus(taskid)
    mq = get_or_save_mq("pythonjava")
    if js.get("method") == 'start':
        writeconf(taskid)
        t1 = threading.Thread(target=go, args=(mq,))
        t1.start()
    if js.get("method") == 'stop':
        t2 = threading.Thread(target=ki, args=(mq,))
        t2.start()


credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='javapython')

channel.basic_consume(callback,
                      queue='javapython',
                      no_ack=True)
print('[消费者] waiting for msg .')
channel.start_consuming()  # 开始循环取消息

以上是Python消费者的代码,目前消费者代码是以脚本形式完成的,作为整个爬虫的入口,消费者代码监听来自Java控制端的命令来控制整个爬虫的运行。

def get_or_save_mq(queue_name):
    mq = MQ_DICT.get(queue_name)
    if mq:
        return mq
    else:
        mq = InitMq(queue_name)
        MQ_DICT[queue_name] = mq
        return mq


class InitMq:
    def __init__(self, uuid):
        queue = uuid
        print("***********初始化MQ驱动*************")
        credentials = pika.PlainCredentials('guest', 'guest')
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=queue)
        self.routing_key = queue

    def send_data(self, body):
        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=body.encode('utf-8'))

以上是Python中生产者代码,此生产者将爬虫端产生的错误信息与提示信息发到Java控制端。

猜你喜欢

转载自blog.csdn.net/mrliqifeng/article/details/86772325