如何编写一个简单的 Redis 客户端

老雷 创作于 2016-05-12 , 2016-08-03 第 1 次更新
Node.js Client Redis 全文约 8854 字,预计阅读时间为 23 分钟

前几天写了篇文章《如何用 Node.js 编写一个 API 客户端》http://morning.work/page/2016-05/how-to-write-a-nodejs-api-client-package.html ),有人说这 不能算是一个 API 客户端,顶多是一个支持 GET / POST 操作的模块 ,羞得我老脸微微一红,故作镇静地自然自语道,简单是简单点了,好歹也是个 API 客户端嘛。

这次要写的这个 Redis 客户端应该算是个客户端了,需要直接发起TCP/IP连接去跟服务器通讯,需要自己解析客户端返回的结果,还要做一些简单的容错处理,如果要做到足够健壮也不容易,不过就本文要实现一个基本可用的例子来说,还是简单了点。

无论是实现 REST 的 API 客户端还是这样一个 Redis 客户端,虽然具体实现的细节不同,但是,套路还是一样的。二十一世纪行走江湖最重要的是什么?套路!套路!套路!所以呢,本文还是跟之前一样的套路。

Redis 协议

要开始编写一个 Redis 客户端,我们首先要知道怎么去跟 Redis 通讯,比如要执行GET a应该按照什么样的格式给服务器发送指令,服务器返回的结果又是什么样的格式这些。Redis 协议的详细介绍可以参考这里:http://redis.cn/topics/protocol.html

假如我要执行命令KEYS *,只要往服务器发送KEYS *\r\n即可,这时服务器会直接响应结果,返回的结果格式如下:

每一行都使用\r\n来分隔。

为了查看具体的返回结果是怎样的,我们可以用nc命令来测试。假定本机已经运行了 Redis 服务,其监听端口为6379,我们可以执行以下命令连接:

nc 127.0.0.1 6379

如果本机没有nc命令(比如 Windows 用户),可以使用telnet命令:

telnet 127.0.0.1 6379

下面我们分别测试各个命令返回的结果(其中第一行表示客户端输入的命令,行尾的表示按回车发送,第二行开始表示服务器端返回的内容):

1、返回错误

help ↵

-ERR unknown command 'help'

2、操作成功

set abc 123456 ↵

+OK

3、得到结果

get abc ↵

$6
123456

4、得不到结果

get aaa ↵

$-1

5、得到的结果是整形数字

hlen aaa ↵

:5

6、数组结果

keys a* ↵

*3
$3
abc
$3
aa1
$1
a

7、多命令执行

multi ↵

+OK

get a ↵

+QUEUED

get b ↵

+QUEUED

get c ↵

+QUEUED

exec ↵

*3
$5
hello
$-1
$5
world

解析结果

实现一个 Redis 客户端大概的原理是,客户端依次把需要执行的命令发送给服务器,而服务器会按照先后顺序把结果返回给用户。在本文我们使用 Node.js 内置的net模块来操作,通过data事件来接收结果。需要注意的是,有时候结果太长我们可能要几次data事件才能拿到完整的结果,有时可能是一个data事件中包含了几条命令的执行结果,也有可能当前命令的结果还没有传输完,剩下一半的结果在下一个data事件中。

为了方便调试,我们将解析结果的部分独立封装成一个函数,接口如下:

const proto = new RedisProto();

// 接受到数据
proto.push('*3\r\n$3\r\nabc\r\n$3\r\naa1\r\n$1\r\na\r\n');
proto.push('$6\r\n123456\r\n');
proto.push('-ERR unknown command \'help\'\r\n');
proto.push('+OK\r\n');
proto.push(':5\r\n');
proto.push('*3\r\n$5\r\nhe');
proto.push('llo\r\n$-');
proto.push('1\r\n$5\r\nworld\r\n');

while (proto.next()) {
  // proto.next() 如果有解析出完整的结果则返回结果,没有则返回 false
  // 另外可以通过 proto.result 获得
  console.log(proto.result);
}

接下来开始编写相应的代码。

按照套路,我们先初始化项目:

mkdir redis_client
cd redis_client
git init
npm init

新建文件proto.js

'use strict';

/**
 * 简单 Redis 客户端
 *
 * @author Zongmin Lei <leizongmin@gmail.com>
 */

class RedisProto {

  constructor() {

    this._lines = []; // 已初步解析出来的行
    this._text = '';  // 剩余不能构成一行的文本

  }

  // 将收到的数据添加到缓冲区
  push(text) {

    // 将结果按照\r\n 分隔
    const lines = (this._text + text).split('\r\n');
    // 如果结尾是\r\n,那么数组最后一个元素肯定是一个空字符串
    // 否则,我们应该将剩余的部分跟下一个 data 事件接收到的数据连起来
    this._text = lines.pop();
    this._lines = this._lines.concat(lines);

  }

  // 解析下一个结果,如果没有则返回 null
  next() {

    const lines = this._lines;
    const first = lines[0];

    // 去掉指定数量的行,并且返回结果
    const popResult = (lineNumber, result) => {
      this._lines = this._lines.slice(lineNumber);
      return this.result = result;
    };

    // 返回空结果
    const popEmpty = () => {
      return this.result = false;
    };

    if (lines.length < 1) return popEmpty();

    switch (first[0]) {

      case '+':
        return popResult(1, {data: first.slice(1)});

      case '-':
        return popResult(1, {error: first.slice(1)});

      case ':':
        return popResult(1, {data: Number(first.slice(1))});

      case '$': {
        const n = Number(first.slice(1));
        if (n === -1) {
          // 如果是 $-1 表示空结果
          return popResult(1, {data: null});
        } else {
          // 否则取后面一行作为结果
          const second = lines[1];
          if (typeof second !== 'undefined') {
            return popResult(2, {data: second});
          } else {
            return popEmpty();
          }
        }
      }

      case '*': {
        const n = Number(first.slice(1));
        if (n === 0) {
          return popResult(1, {data: []});
        } else {
          const array = [];
          let i = 1;
          for (; i < lines.length && array.length < n; i++) {
            const a = lines[i];
            const b = lines[i + 1];
            if (a.slice(0, 3) === '$-1') {
              array.push(null);
            } else if (a[0] === ':') {
              array.push(Number(a.slice(1)));
            } else {
              if (typeof b !== 'undefined') {
                array.push(b);
                i++;
              } else {
                return popEmpty();
              }
            }
          }
          if (array.length === n) {
            return popResult(i, {data: array});
          } else {
            return popEmpty();
          }
        }
      }

      default:
        return popEmpty();

    }

  }

}

module.exports = RedisProto;

执行上文中的测试代码可得到如下结果:

{ data: '123456' }
{ data: [ 'abc', 'aa1', 'a' ] }
{ error: 'ERR unknown command \'help\'' }
{ data: 'OK' }
{ data: 5 }
{ data: [ 'hello', null, 'world' ] }

实现 Redis 客户端

上文我们已经实现了一个简单的解析器,其可以通过push()将接收到的数据片段加进去,然后我们只需要不断地调用next()来获取下一个解析出来的结果即可,直到其返回false,在下一次收到数据时,重复刚才的动作。

新建文件index.js

'use strict';

/**
 * 简单 Redis 客户端
 *
 * @author Zongmin Lei <leizongmin@gmail.com>
 */

const events = require('events');
const net = require('net');
const RedisProto = require('./proto');

class Redis extends events.EventEmitter {

  constructor(options) {
    super();

    // 默认连接配置
    options = options || {};
    options.host = options.host || '127.0.0.1';
    options.port = options.port || 6379;
    this.options = options;

    // 连接状态
    this._isClosed = false;
    this._isConnected = false;

    // 回调函数列表
    this._callbacks = [];

    this._proto = new RedisProto();

    this.connection = net.createConnection(options.port, options.host, () => {
      this._isConnected = true;
      this.emit('connect');
    });

    this.connection.on('error', err => {
      this.emit('error', err);
    });

    this.connection.on('close', () => {
      this._isClosed = true;
      this.emit('close');
    });

    this.connection.on('end', () => {
      this.emit('end');
    });

    this.connection.on('data', data => {
      this._pushData(data);
    });

  }

  // 发送命令给服务器
  sendCommand(cmd, callback) {
    return new Promise((resolve, reject) => {

      const cb = (err, ret) => {
        callback && callback(err, ret);
        err ? reject(err) : resolve(ret);
      };

      // 如果当前连接已断开,直接返回错误
      if (this._isClosed) {
        return cb(new Error('connection has been closed'));
      }

      // 将回调函数添加到队列
      this._callbacks.push(cb);
      // 发送命令
      this.connection.write(`${cmd}\r\n`);

    });
  }

  // 接收到数据,循环结果
  _pushData(data) {

    this._proto.push(data);

    while (this._proto.next()) {

      const result = this._proto.result;
      const cb = this._callbacks.shift();

      if (result.error) {
        cb(new Error(result.error));
      } else {
        cb(null, result.data);
      }

    }

  }

  // 关闭连接
  end() {
    this.connection.destroy();
  }

}

module.exports = Redis;

说明:

新建测试文件test.js

'use strict';

const Redis = require('./index');
const client = new Redis();

client.sendCommand('GET a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);
});

client.sendCommand('GET b', (err, ret) => {
  console.log('b=%s, err=%s', ret, err);
});

client.sendCommand('KEYS *IO*', (err, ret) => {
  console.log('KEYS *IO*=%s, err=%s', ret, err);
});

client.sendCommand('OOXX', (err, ret) => {
  console.log('OOXX=%s, err=%s', ret, err);
});

client.sendCommand('SET a ' + Date.now())
  .then(ret => console.log('success', ret))
  .catch(err => console.log('error', err))
  .then(() => client.end());

执行测试文件node test.js可看到类似如下的结果:

a=1463041835231, err=null
b=null, err=null
KEYS *IO*=sess:cz5F-npwOnw0FmesT6JjqJPL13IO8AzV,sess:NS90IkF6uZNAm-FPEAWXHuh3JrIW1-IO, err=null
OOXX=undefined, err=Error: ERR unknown command 'OOXX'
success OK

从结果中可以看出我们这个 Redis 客户端已经基本能用了。

更友好的接口

上文我们实现了一个sendCommand()方法,理论上可以通过该方法执行任意的 Redis 命令,但是我们可能更希望每条命令有一个对应的方法,比如sendCommand('GET a')我们可以写成get('a'),这样看起来会更直观。

首先在index.js文件头部载入fspath模块:

const fs = require('fs');
const path = require('path');

然后给Redis类增加_bindCommands()方法:

_bindCommands() {

  const self = this;

  // 绑定命令
  const bind = (cmd) => {
    return function () {

      let args = Array.prototype.slice.call(arguments);
      let callback;
      if (typeof args[args.length - 1] === 'function') {
        callback = args.pop();
      }

      // 每个参数使用空格分隔
      args = args.map(item => Array.isArray(item) ? item.join(' ') : item).join(' ');

      return self.sendCommand(`${cmd} ${args}`, callback);

    };
  };

  // 从文件读取命令列表
  const cmdList = fs.readFileSync(path.resolve(__dirname, 'cmd.txt')).toString().split('\n');
  for (const cmd of cmdList) {

    // 同时支持大写和小写的函数名
    this[cmd.toLowerCase()] = this[cmd.toUpperCase()] = bind(cmd);

  }

}

然后在Redis类的constructor()方法尾部增加以下代码:

this._bindCommands();

由于在_bindCommands()中通过读取cmd.txt文件来读取 Redis 的命令列表,所以还需要新建文件cmd.txt,内容格式为每条命令一行(由于篇幅限制,本文只列出需要用到的几条命令):

GET
SET
AUTH
MULTI
EXEC
KEYS

把测试文件test.js改为以下代码:

'use strict';

const Redis = require('./index');
const client = new Redis();

client.get('a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);
});

client.get('b', (err, ret) => {
  console.log('b=%s, err=%s', ret, err);
});

client.keys('*IO*', (err, ret) => {
  console.log('KEYS *IO*=%s, err=%s', ret, err);
});

client.set('a', Date.now())
  .then(ret => console.log('success', ret))
  .catch(err => console.log('error', err))
  .then(() => client.end())

重新执行node test.js可看到结果跟上文还是一致的。

简单容错处理

假如将测试文件test.js改为这样:

'use strict';

const Redis = require('./index');
const client = new Redis();

client.get('a', (err, ret) => {
  console.log('a=%s, err=%s', ret, err);

  client.end();

  client.get('b', (err, ret) => {
    console.log('b=%s, err=%s', ret, err);
  });
});

在完成get('a')的时候,我们执行client.end()关闭了连接,然后再执行get('b'),大多数情况下将会得到如下的结果:

a=1463042964235, err=null

get('b')的回调函数并没有被执行,因为我们在关闭连接后,再也没有收到服务端返回的结果。另外也有可能是因为其他原因,客户端与服务端的连接断开了,此时我们应该能执行回调并返回一个错误。

在文件index.js中给Redis类增加一个方法_callbackAll()

_callbackAll() {

  for (const cb of this._callbacks) {
    cb(new Error('connection has been closed'));
  }
  this._callbacks = [];

}

另外,在constructor()方法内,将监听连接的close事件部分代码改成这样:

this.connection.on('close', () => {
  this._isClosed = true;
  this.emit('close');
  this._callbackAll();
});

重新执行node test.js,从执行结果可看出所有回调函数均已被执行:

a=1463042964235, err=null
b=undefined, err=Error: connection has been closed

还存在的问题

看起来这个模块已经能正常使用了,但是其实并不完善。跟 NPM 上的ioredis模块起来还存在以下问题:

相关链接