Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

message expiration not work #24

Open
CrazyMan2 opened this issue Oct 13, 2019 · 13 comments
Open

message expiration not work #24

CrazyMan2 opened this issue Oct 13, 2019 · 13 comments

Comments

@CrazyMan2
Copy link

import {
Connection,
Queue,
Exchange
} from 'react-native-rabbitmq';

let connection:Connection=null;
let queue:Queue;
let exchange:Exchange;
let mExchangeName="weixinFace";
let mCallFun=null;//回调函数

export function setCallFun(callFun) {
mCallFun=callFun;
}
export function connectRabbitMq(channel) {
const config = {
host: 'xxxxxx',
port: 5672,
username: 'xxxx', //your rabbitmq management username
password: 'xxxx', //your rabbitmq management password
virtualhost: '/',
};

connection = new Connection(config);
connection.connect();
let connected = false;

connection.on('connected', (event) => {
    console.log("connected");
    queue = new Queue(connection, {
        name: channel,
        passive: false,
        durable: true,
        exclusive: false,
        // consumer_arguments: { 'x-message-ttl': 1000 },
        // Arguments:{'x-message-ttl': 1000 },
        // ttl:1000,
        autoDelete: false,
    });
    exchange = new Exchange(connection, {
        name: mExchangeName,
        type: 'direct',
        durable:true,
        autoDelete: false,
        internal: false,
    });
    queue.bind(exchange, channel);
    queue.on('message', (data) => {
        console.log(data);
        if(mCallFun)mCallFun(data);
    });
});
connection.on('error', event => {
    connected = false;
    console.log(connection);
    console.log(event);
    console.log("you are not connected");
});

}
export function closeRabbitMq() {
// connection.close();
}
export function sendMessage(message,channel) {
let properties ={expiration:1000};
exchange.publish(message,channel,properties);
}

queue ttl and exchange properties expiration not work ,please help me

@CrazyMan2
Copy link
Author

queue = new Queue(connection, {
name: channel,
passive: false,
durable: true,
exclusive: false,
consumer_arguments: { 'x-message-ttl': 1000 },
// Arguments:{'x-message-ttl': 1000 },
// ttl:1000,
autoDelete: false,
});
image
x-message-ttl not set success

@mrozkosz
Copy link

I have the same problem. Someone knows how to solve it?

@timhonders
Copy link
Contributor

IOS or Android?

@timhonders
Copy link
Contributor

Can u try this

queue = new Queue(connection, {
name: channel,
passive: false,
durable: true,
exclusive: false,
consumer_arguments: { },
autoDelete: false,
}, {
'x-message-ttl': 1000
});

@mrozkosz
Copy link

I have this problem in both platforms. On Android I chanced a variable in file RabbitMqQueue.java
before
this.channel.queueDeclare(this.name, this.durable, this.exclusive, this.autodelete, args);
now (it work)
this.channel.queueDeclare(this.name, this.durable, this.exclusive, this.autodelete, consumer_args);
Still I don't know how to solve this problem on iOS .

Zrzut-ekranu-2019-12-19-o-18-22-43

@timhonders
Copy link
Contributor

ok that's not the way to do it, i will take al look at this

@timhonders
Copy link
Contributor

This example works on android:

this.queue = new Queue(this.connection, {
name: 'test5.testqueue',
passive: false,
durable: true,
exclusive: false,
consumer_arguments: {
'x-priority': this.priority,
},
buffer_delay: 600,
autoBufferAck: true
}, {
'x-queue-mode': 'lazy',
'x-message-ttl': 1000,
'x-expires': 1800000
});

rmq

@mrozkosz
Copy link

Is there any solution for IOS?

@CrazyMan2 CrazyMan2 reopened this Jan 2, 2020
@Ro392
Copy link

Ro392 commented Jan 4, 2020

Hello friend, do you know how to solve this error on IOS? It is very important!

@Ro392
Copy link

Ro392 commented Jan 7, 2020

IOS - sends arguments, but the queue created doesn't have them.
Zrzut ekranu 2020-01-7 o 12 09 35

Zrzut ekranu 2020-01-7 o 12 03 58

@timhonders
Copy link
Contributor

Can u show youre code

@Ro392
Copy link

Ro392 commented Jan 26, 2020

Zrzut ekranu 2020-01-26 o 18 16 10

@foo-io
Copy link

foo-io commented Dec 10, 2023

Hi! Replace this code in node_modules/react-native-rabbit/ios/RCTReactNativeRabbitMq/RabbitMqQueue.m

#import "RabbitMqQueue.h"

@interface RabbitMqQueue ()
    @property (nonatomic, readwrite) NSString *name;
    @property (nonatomic, readwrite) NSDictionary *config;
    @property (nonatomic, readwrite) RMQQueue *queue; 
    @property (nonatomic, readwrite) id<RMQChannel> channel;
    @property (nonatomic, readwrite) RMQQueueDeclareOptions options;
    @property (nonatomic, readwrite) RCTBridge *bridge;
    @property (nonatomic, readwrite) RMQConsumer *consumer;
@end

@implementation RabbitMqQueue

RCT_EXPORT_MODULE();

-(id)initWithConfig:(NSDictionary *)config channel:(id<RMQChannel>)channel {
    if (self = [super init]) {

        self.config = config;
        self.channel = channel;
        self.name = [config objectForKey:@"name"];

        self.options = RMQQueueDeclareNoOptions;
    
        if ([config objectForKey:@"passive"] != nil && [[config objectForKey:@"passive"] boolValue]){
            self.options = self.options | RMQQueueDeclarePassive;
        }

        if ([config objectForKey:@"durable"] != nil && [[config objectForKey:@"durable"] boolValue]){
            self.options = self.options | RMQQueueDeclareDurable;
        }

        if ([config objectForKey:@"exclusive"] != nil && [[config objectForKey:@"exclusive"] boolValue]){
            self.options = self.options | RMQQueueDeclareExclusive;
        }

        if ([config objectForKey:@"autoDelete"] != nil && [[config objectForKey:@"autoDelete"] boolValue]){
            self.options = self.options | RMQExchangeDeclareAutoDelete;
        }

        if ([config objectForKey:@"NoWait"] != nil && [[config objectForKey:@"NoWait"] boolValue]){
            self.options = self.options | RMQQueueDeclareNoWait;
        }

        NSMutableDictionary *tmp_arguments = [[NSMutableDictionary alloc]init];
        if ([config objectForKey:@"consumer_arguments"] != nil){

            NSDictionary *consumer_arguments = [config objectForKey:@"consumer_arguments"];

            if ([consumer_arguments objectForKey:@"x-message-ttl"] != nil){
                NSNumber *ttl = [consumer_arguments objectForKey:@"x-message-ttl"];
                [tmp_arguments setObject:[[RMQSignedShort alloc] init:[ttl integerValue]] forKey:@"x-message-ttl"];
            }
            
            if ([consumer_arguments objectForKey:@"x-expires"] != nil){
                NSNumber *expires = [consumer_arguments objectForKey:@"x-expires"];
                [tmp_arguments setObject:[[RMQSignedLong alloc] init:[expires intValue]] forKey:@"x-expires"];
            }

            if ([consumer_arguments objectForKey:@"x-max-length"] != nil){
                NSNumber *maxLength = [consumer_arguments objectForKey:@"x-max-length"];
                [tmp_arguments setObject:[[RMQSignedLong alloc] init:[maxLength intValue]] forKey:@"x-max-length"];
            }

            if ([consumer_arguments objectForKey:@"x-priority"] != nil){
                NSNumber *xpriority = [consumer_arguments objectForKey:@"x-priority"];
                [tmp_arguments setObject:[[RMQSignedShort alloc] init:[xpriority integerValue]] forKey:@"x-priority"];
            }

            if ([consumer_arguments objectForKey:@"x-single-active-consumer"] != nil){
                BOOL active = [[consumer_arguments valueForKey:@"x-single-active-consumer"] boolValue];
                [tmp_arguments setObject:[[RMQBoolean alloc] init:active] forKey:@"x-single-active-consumer"];
            }

        }

        RMQBasicConsumeOptions consumer_options = RMQBasicConsumeNoOptions;

        RMQTable *arguments = [[RMQTable alloc] init:tmp_arguments];

        self.queue = [self.channel queue:self.name options:self.options arguments:tmp_arguments];

        self.consumer = [self.queue subscribe:consumer_options
                    arguments:arguments
                    handler:^(RMQMessage * _Nonnull message) {

            NSString *body = [[NSString alloc] initWithData:message.body encoding:NSUTF8StringEncoding];

            //[self.channel ack:message.deliveryTag];

            [EventEmitter emitEventWithName:@"RabbitMqQueueEvent" 
                body:@{
                    @"name": @"message", 
                    @"queue_name": self.name, 
                    @"message": body, 
                    @"routingKey": message.routingKey, // Will be deprecated
                    @"routing_key": message.routingKey, 
                    @"exchange": message.exchangeName,
                    @"consumer_tag": message.consumerTag, 
                    @"delivery_tag": message.deliveryTag
                }
            ];
    
        }];

    }
    return self;
}

-(void) bind:(RMQExchange *)exchange routing_key:(NSString *)routing_key {
    if ([routing_key length] == 0){
        [self.queue bind:exchange];
    }else{
        [self.queue bind:exchange routingKey:routing_key];
    }
}

-(void) unbind:(RMQExchange *)exchange routing_key:(NSString *)routing_key {
    if ([routing_key length] == 0){
        [self.queue unbind:exchange];
    }else{
        [self.queue unbind:exchange routingKey:routing_key];
    }
}

-(void) delete {
    [self.queue delete:self.options];
}

-(void) ack:(NSNumber *)deliveryTag {
    [self.channel ack:deliveryTag];
}

-(void) cancelConsumer {
    [self.consumer cancel];
}

@end

and example for use:

let queue = new Queue(connection, {
          name: 'queueName',
          passive: false,
          durable: true,
          exclusive: false,
          consumer_arguments: {
            'x-expires': 1800000,
            'x-max-length': 1,
          },
        });

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants