diff --git a/README.md b/README.md index 52c5119..def195a 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,7 @@ The following options are allowed: - `pubClient`: optional, the redis client to publish events on - `subClient`: optional, the redis client to subscribe to events on - `requestsTimeout`: optional, after this timeout the adapter will stop waiting from responses to request (`1000ms`) +- `withChannelMultiplexing`: optional, whether channel multiplexing is enabled (a new subscription will be trigggered for each room) (`true`) If you decide to supply `pubClient` and `subClient`, make sure you use [node_redis](https://github.com/mranney/node_redis) as a client or one diff --git a/index.js b/index.js index c5e3a3f..c6fa2ce 100644 --- a/index.js +++ b/index.js @@ -49,6 +49,7 @@ function adapter(uri, opts){ var prefix = opts.key || 'socket.io'; var subEvent = opts.subEvent || 'message'; var requestsTimeout = opts.requestsTimeout || 1000; + var withChannelMultiplexing = false !== opts.withChannelMultiplexing; // init clients if needed function createClient(redis_opts) { @@ -79,6 +80,7 @@ function adapter(uri, opts){ this.uid = uid; this.prefix = prefix; this.requestsTimeout = requestsTimeout; + this.withChannelMultiplexing = withChannelMultiplexing; this.channel = prefix + '#' + nsp.name + '#'; this.requestChannel = prefix + '-request#' + this.nsp.name + '#'; @@ -279,7 +281,7 @@ function adapter(uri, opts){ if (!(remote || (opts && opts.flags && opts.flags.local))) { var self = this; var msg = msgpack.encode([uid, packet, opts]); - if (opts.rooms) { + if (self.withChannelMultiplexing && opts.rooms) { opts.rooms.forEach(function(room) { var chnRoom = self.channel + room + '#'; pub.publish(chnRoom, msg); @@ -304,6 +306,11 @@ function adapter(uri, opts){ debug('adding %s to %s ', id, room); var self = this; Adapter.prototype.add.call(this, id, room); + + if (!this.withChannelMultiplexing) { + if (fn) fn(null); + return; + } var channel = this.channel + room + '#'; sub.subscribe(channel, function(err){ if (err) { @@ -331,7 +338,7 @@ function adapter(uri, opts){ var hasRoom = this.rooms.hasOwnProperty(room); Adapter.prototype.del.call(this, id, room); - if (hasRoom && !this.rooms[room]) { + if (this.withChannelMultiplexing && hasRoom && !this.rooms[room]) { var channel = this.channel + room + '#'; sub.unsubscribe(channel, function(err){ if (err) { diff --git a/test/index.js b/test/index.js index 8c273fb..6de8637 100644 --- a/test/index.js +++ b/test/index.js @@ -3,263 +3,319 @@ var http = require('http').Server; var io = require('socket.io'); var ioc = require('socket.io-client'); var expect = require('expect.js'); -var redis = require('redis').createClient; var adapter = require('../'); -describe('socket.io-redis', function(){ - - it('broadcasts', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - client1.on('woot', function(a, b, c){ - expect(a).to.eql([]); - expect(b).to.eql({ a: 'b' }); - expect(Buffer.isBuffer(c)).to.be(true); - client1.disconnect(); - client2.disconnect(); - done(); - }); - server2.on('connection', function(c2){ - var buf = new Buffer('asdfasdf', 'utf8'); - c2.broadcast.emit('woot', [], { a: 'b' }, buf); +[ + { + name: 'socket.io-redis', + create: function create(nsp, fn){ + var redis = require('redis').createClient; + var srv = http(); + var sio = io(srv); + sio.adapter(adapter({ + pubClient: redis(), + subClient: redis(null, null, { return_buffers: true }) + })); + srv.listen(function(err){ + if (err) throw err; // abort tests + if ('function' == typeof nsp) { + fn = nsp; + nsp = ''; + } + nsp = nsp || '/'; + var addr = srv.address(); + var url = 'http://localhost:' + addr.port + nsp; + fn(sio.of(nsp), ioc(url)); + }); + } + }, + { + name: 'socket.io-redis without channel multiplexing', + create: function create(nsp, fn){ + var redis = require('redis').createClient; + var srv = http(); + var sio = io(srv); + sio.adapter(adapter({ + pubClient: redis(), + subClient: redis(null, null, { return_buffers: true }), + withChannelMultiplexing: false + })); + srv.listen(function(err){ + if (err) throw err; // abort tests + if ('function' == typeof nsp) { + fn = nsp; + nsp = ''; + } + nsp = nsp || '/'; + var addr = srv.address(); + var url = 'http://localhost:' + addr.port + nsp; + fn(sio.of(nsp), ioc(url)); + }); + } + }, + { + name: 'socket.io-redis with ioredis', + create: function create(nsp, fn){ + var redis = require('ioredis').createClient; + var srv = http(); + var sio = io(srv); + sio.adapter(adapter({ + pubClient: redis(), + subClient: redis(null, null, { return_buffers: true }), + subEvent: 'messageBuffer' + })); + srv.listen(function(err){ + if (err) throw err; // abort tests + if ('function' == typeof nsp) { + fn = nsp; + nsp = ''; + } + nsp = nsp || '/'; + var addr = srv.address(); + var url = 'http://localhost:' + addr.port + nsp; + fn(sio.of(nsp), ioc(url)); + }); + } + }, +].forEach(function (suite) { + var name = suite.name; + var create = suite.create; + + describe(name, function(){ + + it('broadcasts', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + client1.on('woot', function(a, b, c){ + expect(a).to.eql([]); + expect(b).to.eql({ a: 'b' }); + expect(Buffer.isBuffer(c)).to.be(true); + client1.disconnect(); + client2.disconnect(); + done(); + }); + server2.on('connection', function(c2){ + setTimeout(function(){ + var buf = new Buffer('asdfasdf', 'utf8'); + c2.broadcast.emit('woot', [], { a: 'b' }, buf); + }, 100); + }); }); }); }); - }); - it('broadcasts to rooms', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - }); + it('broadcasts to rooms', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + create(function(server3, client3){ + server1.on('connection', function(c1){ + c1.join('woot'); + }); - server2.on('connection', function(c2){ - // does not join, performs broadcast - c2.on('do broadcast', function(){ - c2.broadcast.to('woot').emit('broadcast'); + server2.on('connection', function(c2){ + // does not join, performs broadcast + c2.on('do broadcast', function(){ + c2.broadcast.to('woot').emit('broadcast'); + }); }); - }); - server3.on('connection', function(c3){ - // does not join, signals broadcast - client2.emit('do broadcast'); - }); + server3.on('connection', function(c3){ + // does not join, signals broadcast + client2.emit('do broadcast'); + }); - client1.on('broadcast', function(){ - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - setTimeout(done, 100); - }); + client1.on('broadcast', function(){ + client1.disconnect(); + client2.disconnect(); + client3.disconnect(); + setTimeout(done, 100); + }); - client2.on('broadcast', function(){ - throw new Error('Not in room'); - }); + client2.on('broadcast', function(){ + throw new Error('Not in room'); + }); - client3.on('broadcast', function(){ - throw new Error('Not in room'); + client3.on('broadcast', function(){ + throw new Error('Not in room'); + }); }); }); }); }); - }); - it('doesn\'t broadcast when using the local flag', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - }); + it('doesn\'t broadcast when using the local flag', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + create(function(server3, client3){ + server1.on('connection', function(c1){ + c1.join('woot'); + }); - server2.on('connection', function(c2){ - c2.join('woot'); + server2.on('connection', function(c2){ + c2.join('woot'); - c2.on('do broadcast', function(){ - server2.local.to('woot').emit('local broadcast'); + c2.on('do broadcast', function(){ + server2.local.to('woot').emit('local broadcast'); + }); }); - }); - server3.on('connection', function(c3){ - // does not join, signals broadcast - client2.emit('do broadcast'); - }); + server3.on('connection', function(c3){ + // does not join, signals broadcast + client2.emit('do broadcast'); + }); - client1.on('local broadcast', function(){ - throw new Error('Not in local server'); - }); + client1.on('local broadcast', function(){ + throw new Error('Not in local server'); + }); - client2.on('local broadcast', function(){ - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - setTimeout(done, 100); - }); + client2.on('local broadcast', function(){ + client1.disconnect(); + client2.disconnect(); + client3.disconnect(); + setTimeout(done, 100); + }); - client3.on('local broadcast', function(){ - throw new Error('Not in local server'); + client3.on('local broadcast', function(){ + throw new Error('Not in local server'); + }); }); }); }); }); - }); - it('doesn\'t broadcast to left rooms', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - c1.leave('woot'); - }); + it('doesn\'t broadcast to left rooms', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + create(function(server3, client3){ + server1.on('connection', function(c1){ + c1.join('woot'); + c1.leave('woot'); + }); - server2.on('connection', function(c2){ - c2.on('do broadcast', function(){ - c2.broadcast.to('woot').emit('broadcast'); - - setTimeout(function() { - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - done(); - }, 100); + server2.on('connection', function(c2){ + c2.on('do broadcast', function(){ + c2.broadcast.to('woot').emit('broadcast'); + + setTimeout(function() { + client1.disconnect(); + client2.disconnect(); + client3.disconnect(); + done(); + }, 100); + }); }); - }); - server3.on('connection', function(c3){ - client2.emit('do broadcast'); - }); + server3.on('connection', function(c3){ + client2.emit('do broadcast'); + }); - client1.on('broadcast', function(){ - throw new Error('Not in room'); + client1.on('broadcast', function(){ + throw new Error('Not in room'); + }); }); }); }); }); - }); - it('deletes rooms upon disconnection', function(done){ - create(function(server, client){ - server.on('connection', function(c){ - c.join('woot'); - c.on('disconnect', function() { - expect(c.adapter.sids[c.id]).to.be.empty(); - expect(c.adapter.rooms).to.be.empty(); - client.disconnect(); - done(); + it('deletes rooms upon disconnection', function(done){ + create(function(server, client){ + server.on('connection', function(c){ + c.join('woot'); + c.on('disconnect', function() { + expect(c.adapter.sids[c.id]).to.be.empty(); + expect(c.adapter.rooms).to.be.empty(); + client.disconnect(); + done(); + }); + c.disconnect(); }); - c.disconnect(); }); }); - }); - it('returns clients in the same room', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - var ready = 0; - - server1.on('connection', function(c1){ - c1.join('woot'); - ready++; - if(ready === 3){ - test(); - } - }); + it('returns clients in the same room', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + create(function(server3, client3){ + var ready = 0; + + server1.on('connection', function(c1){ + c1.join('woot'); + ready++; + if(ready === 3){ + test(); + } + }); - server2.on('connection', function(c1){ - c1.join('woot'); - ready++; - if(ready === 3){ - test(); - } - }); + server2.on('connection', function(c1){ + c1.join('woot'); + ready++; + if(ready === 3){ + test(); + } + }); - server3.on('connection', function(c3){ - ready++; - if(ready === 3){ - test(); - } - }); + server3.on('connection', function(c3){ + ready++; + if(ready === 3){ + test(); + } + }); - function test(){ - setTimeout(function(){ - server1.adapter.clients(['woot'], function(err, clients){ - expect(clients.length).to.eql(2); - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - done(); - }); - }, 100); - } + function test(){ + setTimeout(function(){ + server1.adapter.clients(['woot'], function(err, clients){ + expect(clients.length).to.eql(2); + client1.disconnect(); + client2.disconnect(); + client3.disconnect(); + done(); + }); + }, 100); + } + }); }); }); }); - }); - - describe('rooms', function () { - it('returns rooms of a given client', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - server1.on('connection', function(c1){ - c1.join('woot1', function () { - server1.adapter.clientRooms(c1.id, function(err, rooms){ - expect(rooms).to.eql([c1.id, 'woot1']); - client1.disconnect(); - client2.disconnect(); - done(); + describe('rooms', function () { + it('returns rooms of a given client', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot1', function () { + server1.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot1']); + client1.disconnect(); + client2.disconnect(); + done(); + }); }); }); - }); + }); }); }); - }); - - it('returns rooms of a given client from another node', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - server1.on('connection', function(c1){ - c1.join('woot2', function () { - server2.adapter.clientRooms(c1.id, function(err, rooms){ - expect(rooms).to.eql([c1.id, 'woot2']); - client1.disconnect(); - client2.disconnect(); - done(); + it('returns rooms of a given client from another node', function(done){ + create(function(server1, client1){ + create(function(server2, client2){ + + server1.on('connection', function(c1){ + c1.join('woot2', function () { + server2.adapter.clientRooms(c1.id, function(err, rooms){ + expect(rooms).to.eql([c1.id, 'woot2']); + client1.disconnect(); + client2.disconnect(); + done(); + }); }); }); - }); + }); }); }); }); }); - - // create a pair of socket.io server+client - function create(nsp, fn){ - var srv = http(); - var sio = io(srv); - sio.adapter(adapter({ - pubClient: redis(), - subClient: redis(null, null, { return_buffers: true }) - })); - srv.listen(function(err){ - if (err) throw err; // abort tests - if ('function' == typeof nsp) { - fn = nsp; - nsp = ''; - } - nsp = nsp || '/'; - var addr = srv.address(); - var url = 'http://localhost:' + addr.port + nsp; - fn(sio.of(nsp), ioc(url)); - }); - } - }); diff --git a/test/ioredis.js b/test/ioredis.js deleted file mode 100644 index e985939..0000000 --- a/test/ioredis.js +++ /dev/null @@ -1,268 +0,0 @@ - -var http = require('http').Server; -var io = require('socket.io'); -var ioc = require('socket.io-client'); -var expect = require('expect.js'); -var redis = require('ioredis').createClient; -var adapter = require('../'); - -describe('socket.io-redis with ioredis', function(){ - - it('broadcasts', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - client1.on('woot', function(a, b, c){ - expect(a).to.eql([]); - expect(b).to.eql({ a: 'b' }); - expect(Buffer.isBuffer(c)).to.be(true); - client1.disconnect(); - client2.disconnect(); - done(); - }); - server2.on('connection', function(c2){ - setTimeout(function(){ - var buf = new Buffer('asdfasdf', 'utf8'); - c2.broadcast.emit('woot', [], { a: 'b' }, buf); - }, 100); - }); - }); - }); - }); - - it('broadcasts to rooms', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - }); - - server2.on('connection', function(c2){ - // does not join, performs broadcast - c2.on('do broadcast', function(){ - c2.broadcast.to('woot').emit('broadcast'); - }); - }); - - server3.on('connection', function(c3){ - // does not join, signals broadcast - client2.emit('do broadcast'); - }); - - client1.on('broadcast', function(){ - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - setTimeout(done, 100); - }); - - client2.on('broadcast', function(){ - throw new Error('Not in room'); - }); - - client3.on('broadcast', function(){ - throw new Error('Not in room'); - }); - }); - }); - }); - }); - - it('doesn\'t broadcast when using the local flag', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - }); - - server2.on('connection', function(c2){ - c2.join('woot'); - - c2.on('do broadcast', function(){ - server2.local.to('woot').emit('local broadcast'); - }); - }); - - server3.on('connection', function(c3){ - // does not join, signals broadcast - client2.emit('do broadcast'); - }); - - client1.on('local broadcast', function(){ - throw new Error('Not in local server'); - }); - - client2.on('local broadcast', function(){ - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - setTimeout(done, 100); - }); - - client3.on('local broadcast', function(){ - throw new Error('Not in local server'); - }); - }); - }); - }); - }); - - it('doesn\'t broadcast to left rooms', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - server1.on('connection', function(c1){ - c1.join('woot'); - c1.leave('woot'); - }); - - server2.on('connection', function(c2){ - c2.on('do broadcast', function(){ - c2.broadcast.to('woot').emit('broadcast'); - - setTimeout(function() { - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - done(); - }, 100); - }); - }); - - server3.on('connection', function(c3){ - client2.emit('do broadcast'); - }); - - client1.on('broadcast', function(){ - throw new Error('Not in room'); - }); - }); - }); - }); - }); - - it('deletes rooms upon disconnection', function(done){ - create(function(server, client){ - server.on('connection', function(c){ - c.join('woot'); - c.on('disconnect', function() { - expect(c.adapter.sids[c.id]).to.be.empty(); - expect(c.adapter.rooms).to.be.empty(); - client.disconnect(); - done(); - }); - c.disconnect(); - }); - }); - }); - - it('returns clients in the same room', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - create(function(server3, client3){ - var ready = 0; - - server1.on('connection', function(c1){ - c1.join('woot'); - ready++; - if(ready === 3){ - test(); - } - }); - - server2.on('connection', function(c1){ - c1.join('woot'); - ready++; - if(ready === 3){ - test(); - } - }); - - server3.on('connection', function(c3){ - ready++; - if(ready === 3){ - test(); - } - }); - - function test(){ - setTimeout(function(){ - server1.adapter.clients(['woot'], function(err, clients){ - expect(clients.length).to.eql(2); - client1.disconnect(); - client2.disconnect(); - client3.disconnect(); - done(); - }); - }, 100); - } - - }); - }); - }); - }); - - describe('rooms', function () { - it('returns rooms of a given client', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - - server1.on('connection', function(c1){ - c1.join('woot1', function () { - server1.adapter.clientRooms(c1.id, function(err, rooms){ - expect(rooms).to.eql([c1.id, 'woot1']); - client1.disconnect(); - client2.disconnect(); - done(); - }); - }); - }); - - }); - }); - }); - - it('returns rooms of a given client from another node', function(done){ - create(function(server1, client1){ - create(function(server2, client2){ - - server1.on('connection', function(c1){ - c1.join('woot2', function () { - server2.adapter.clientRooms(c1.id, function(err, rooms){ - expect(rooms).to.eql([c1.id, 'woot2']); - client1.disconnect(); - client2.disconnect(); - done(); - }); - }); - }); - - }); - }); - }); - }); - - // create a pair of socket.io server+client - function create(nsp, fn){ - var srv = http(); - var sio = io(srv); - sio.adapter(adapter({ - pubClient: redis(), - subClient: redis(null, null, { return_buffers: true }), - subEvent: 'messageBuffer' - })); - srv.listen(function(err){ - if (err) throw err; // abort tests - if ('function' == typeof nsp) { - fn = nsp; - nsp = ''; - } - nsp = nsp || '/'; - var addr = srv.address(); - var url = 'http://localhost:' + addr.port + nsp; - fn(sio.of(nsp), ioc(url)); - }); - } - -});