作者简介:赵磊,Uber高级工程师,08年上海交通大学毕业,曾就职于微软,后加入Facebook主要负责Messenger的后端消息服务。这个系统在当时支持Facebook全球5亿人同时在线。目前在Uber负责消息系统的构建并推进核心服务在高可用性方向的发展。

前言

赵磊在7月21号的全球架构师峰会深圳站上,做了主题演讲: Uber高可用消息系统构建, 对于这个热门主题,高可用架构群展开了热议,大家对分布式系统中的各种错误处理非常感兴趣。Tim Yang特邀赵磊通过微信群,在大洋彼岸的硅谷给大家进一步分享。

分布式系统单点故障怎么办?

non-sharded, stateless类型服务非常容易解决单点故障。 通常load balancer可以按照固定的时间间隔,去health check每个node, 当某一个node出现故障时,load balancer可以把故障的node从pool中排除。

很多服务的health check设计成简单的TCP connect, 或者用HTTP GET的方式,去ping一个特定的endpoint。当业务逻辑比较复杂时,可能业务endpoint故障,但是health endpoint还能正常返回,导致load balancer无法发现单点故障,这种情况可以考虑在health check endpoint中增加简单的业务逻辑判断。

对于短时间的network故障,可能会导致这段时间很多RPC call failures。 在RPC client端通常会实现backoff retry。 failure可能有几种原因:

TCP connect fail,这种情况下retry不会影响业务逻辑,因为Handler还没有执行。

receive timeout, client无法确定handler是不是已经收到了request 而且处理了request,如果handler重复执行会产生side effect,比如database write或者访问其他的service, client retry可能会影响业务逻辑。

对于sharded service,关键是如何找到故障点,而且将更新的membership同步到所有的nodes。下面讨论几种sharding的方案:

将key space hash到很多个小的shard space, 比如4K个shards。 通过zookeeper (distributed mutex) 选出一个master,来将shard分配到node上,而且health check每一个node。当遇到单点故障时,将已经assigned的shards转移到其他的nodes上。 因为全局只有一个single master, 从而保证了shard map的全局一致。当master故障时,其他的backup node会获得lock成为Master

Consistent hashing方式。consistent hashing 通常用来实现cache cluster,不保证一致性。 因为每个client会独立health check每一个node, 同时更新局部的membership。 在network partition的情况或者某一个node不停的重启, 很可能不同的client上的membership不一致,从而将相同的key写在了不同的node上。 当一致性的需求提高时,需要collaborative health check, 即每个node要monitor所有其他node的health。 Uber在这里使用的是gossip protocol,node之间交换health check的信息。

大面积故障怎么办?

大面积故障时,比如交换机故障(rack switch failure),可用的机器不足以处理所有的请求。 我们尽可能做的就是用50%的capacity 处理50%的请求或者50%用户的所有请求。而尽量避免整个服务故障。 当设计一个服务的时候,它的throughput应该是可linear scale的。

在同样的CPU占用情况下,1个机器应该处理100个请求,那么5个机器应该可以处理500个请求。

而且在同样的机器数量下,20%的CPU可以处理200个请求,那么60%的CPU应该可以处理3倍即600个请求。

后者是很难实现的,而且当CPU越高的时候,服务的throughput并不是线性的。 通常在80%CPU以上的情况,throughput会下降非常快。 随着CPU使用增加,request的latency也会提高。 这对上下游的服务可能都是一个挑战,可能会导致cascade failure。

对于nodejs或者java nio一类的async IO框架来说,另外一个问题就是event loop lag。 这两者可能导致connection数量增加。下面举两个例子

有些RPC transport支持pipelining但不支持multiplexing (out of order responses), pipelining是指在同一个TCP连接上可以连续发出Req1, Req2, Req3, Response1, Response2, Response3,即Response的顺序必须和Request的顺序是一致。Req1如果需要很长时间,Req2和3就都不能返回。一个Request如果占用太长时间,会导致后面的很多个Request timeout。RPC client通常也会限制在一个TCP connection上面的max pending requests。但timeout发生,或者max pending requests情况下,client会主动创建新的connection。

event loop lag 是指程序占用太长时间执行连续的CPU intensive任务。 只有当任务结束时,event loop才会handle IO events,比如从socket上面读数据。否则收到的数据只能保存在kernel 的TCP buffer里,通常这个buffer size小于64KB。当buffer满时(而且service又很长时间没有读buffer),socket的远端就不能发送更多的数据。这时也会导致远端的transport error。同样的,client会主动创建新的connection,当connection增加到预设的fd limit时,service就不能继续accept新的TCP connection了,其实是不能open新的文件了。而且,绝大部分的程序没有测试过达到fd limit的场景。很多API需要open file, 比如logging和core dump. 所以,一旦达到fd limit, 就像out of memory一样,将很难recover,只能crash process. 而这时正是过载的时候,重启实际上减少了capacity。 任何crash在过载的情况下只会更糟。

facebook在这防止过载上做的很好,在C++实现的thrift server上,有一个或者多个threads只负责accept TCP connections. 你可以指定最多的connections for thrift calls。 这个connection limit是远小于fd limit, 当connection太多时,thrift server可以fail fast。所以,这种情况下可以让service能一直保持在max qps。

整个数据中心挂掉怎么办?

在Uber的场景中,如果rider已经在一个trip上了,我们通产会等trip结束后才把rider迁移到其他的数据中心,我们叫做 soft failover 。

否则需要hard failover,我们会把DNS指向其他的数据中心。 而且用户的DNS服务器很可能在一段时间内还是cache以前的ip,而且这个cache的时间是基本没办法控制的,所以我们会在load balancer上返回HTTP redirect,这样手机的客户端收到后会立即转向新的备份数据中心。

惊群问题(thundering herd), 很多服务在provision的时候根据平常的QPS预留了很少的容量空间,当数据中心或者load balancer重启的时候,如果所有的客户端同时发起请求,这时的QPS可以是平时的很多倍。 很可能导致大部分请求都失败。一方面需要在客户端实现exponential backoff, 即请求失败后retry的间隔时间是增长的,比如1秒,5秒,20秒等等。另外在load balancer上实现rate limiting或者global blackhole switch, 后者可以有效的丢掉一部分请求而避免过载,同时尽早触发客户端的backoff逻辑。

如果大家用AWS或者其他云服务的话,AWS的一个region通常包括几个数据中心。各个数据中心甚至在相邻的介个城市,有独立的空调系统和供电。

数据中心之间有独立的网络 high throughput low latency, 但是在region之间的网络通常是共有的 high throughput high lantecy

整个region挂掉很少发生。可以把服务部署在多个可用区(Availability Zone)来保证高可用性。

Q & A

Q1:health check endpoint中实现简单的业务逻辑,这个意思是load balancer中有业务逻辑检查的插件么?这样load balancer会不会很重啊,可以详细说一下么?

load balancer仍然是HTTP GET, health check 没有额外的开销,但是服务本身处理health的方式不同,可加入业务逻辑相关的检查 比如是不是能够访问数据库。

Q2:region切换时,用户的数据是怎么迁移的?

这个是个很好的问题,Uber采取的是个非常特别的方法。 realtime系统会在每次用户state change。state change的时候把新的state下载到手机上,而且是加密的。当用户需要迁移到新的数据中心的时候,手机需要上传之前下载的state,服务就可以从之前的state开始,但是non-realtime系统 比如用户数据是通过sql replication来同步的。是Master-master。而且Uber在上层有个数据抽象,数据是基本上immutable的 append-only 所以基本不存在冲突。

Q3:如果是req timeout,但另外一边已经执行成功了,这时候重试,那不就是产生了两次数据?特别是insert这种类型的。

是的,如果是GET类型的请求可以retry, 但是POST类型的请求 那么只能在conn timeout时可以安全的retry。 但是receive timeout不能重试。(Tim补充看法:对于POST请求,如果service实现了幂等操作也是可以retry)。 有些类型的数据可以自动merge比如set和map

Q4:那receive timeout,这种情况下,只能通过merge或者冲突对比解决?

恩 是的。 需要在逻辑层判断是不是能够retry。 这个我建议在更上层实现, 比如在消息系统中,全程不retry 就可以保证at most once delivery, 如果需要保证at least once delivery 需要加入数据库和client dedupe

Q5:大面积故障时Uber用什么手段来控制只处理部分用户请求?

我们实现了一些rate limiting 和 circuit breaking的库,但是这时针对所有请求的。 我们现在还没有做到只处理某些用户的请求。

Q6:“将key space hash到相对小的shard space, 因为全局只有一个single master, 从而保证了shard map的全局一致” 这个方案每次计算shard node的时候,必须先询问下master么?

是的。 在client端有一个shard map的cache, 每隔几秒钟可以refresh, 如果是复杂的实现,则可以是master 推送shardmap change。

Q7:多个机房的数据是sharding存储(就是每个机房只存储一部分用户数据),还是所有机房都有所有用户全量数据?

Uber现在的做法是每个机房有所有用户的数据。 facebook的做法是一个机房有一部分用户的数据。

Q8:那多个机房的数据同步采用什么方案?

facebook用的就是mysql replication,有些细节我不清楚。 Uber还没有跨数据中心的replication,但是我们考虑买riak的enterprise服务,可以支持跨数据中心的 replication。 对于sql数据 我们就2个方案:大部分用户数据还是在postgresql里的(没有sharding, 是个single node),因为Uber起家的时候就在postgres上,这个数据是用postgres原生支持的replication, 另外有个mysql的, mysql存的是trip的数据, 所以是append only而且不需要merge的。 这个我还需要确认是不是每个数据中心里面有全量的数据还是只有本地产生的trip数据。

Uber数据抽象做的比较好,数据分为3类:

最小的 realtime的,跟ongoing trip的个数成正比。 正在迁移到riak

比较大 非realtime的,跟user个数成正比。在postgresql里面 用postgresql的relication,正在迁移到mysql,用mysql的replication

大 非realtime的,跟trip个数成正比。 在MySQL里面有很多partition,一个用户在一个partitionl里面,一个partition一个全局的master,写都去master。 而且Partition很少迁移,所以当seconary变成Master时,可能没有用户之前的trip的信息,replication是offline的 好像是通过backup-restore实现的。

Q9: 那如何实现“每个机房都有全量数据”的?

不是实时的,是在应用层实现的,而且现在还没开始大规模使用。 另外问下riak 有同学在用么? Uber 的很多系统去年就开始迁移到riak上了,因为riak是保证availability的 。将来在Uber会是重点

Q10:Uber的消息系统是基于nodejs的吗?客户端长链接的性能和效率方面如何优化?

是基于nodejs的。我们没有特别优化性能,不过stress test看起来2个物理机可以保持800K连接

Q11:Uber消息系统协议自己DIY吗? 是否基于TLS? PUSH消息QPS能达到多少?

是的,基于HTTPS。 具体QPS我不太记得了。

Q12:riak的性能如何?主要存储哪些类型的数据呢?存储引擎用什么?raik的二级索引有没有用到呢?

riak性能我没测试过,跟数据类型和consistency level都有关系。 可能差别比较大。 我们现在用的好像是leveldb

Q13:应用层实现多机房数据一致的话,是同时多写吗? 这个latency会不会太长?

sql现在都是用在non-realtime系统里面,所以latency可能会比较长

Q14:Uber rpc用的什么框架,上面提到了Thrift有好的fail fast策略,Uber有没有在rpc框架层面进行fail fast设计?

Uber在RPC方面还刚开始。 我们一直是用http+json的,最近在朝tchannel+thrift发展, tchannel是一个类似http2.0的transport,tchannel 在github上能找到。

我们的nodejs thrift 是自己实现的,因为apache thrift在node上做的不是很好,thrift的实现叫做thriftify https://github.com/Uber/thriftify

正好推荐下我的开源项目哈。 在thrift server上我们没有做fail fast, 如何保护是在routing service中实现的。

Q15:Uber走https协议,有没有考虑spdy/http2.0之类的呢?在中国网速状况不是很好的,Uber有没有一些https连接方面的优化措施?

正在考虑迁移到HTTP2.0,这个主要是手机端有没有相应的client实现。 server端我们用的是nginx,nginx上有个experiemnt quality的extension可以支持spdy。 我们还考虑过用facebook的proxygen https://github.com/facebook/proxygen,proxygen支持spdy。 我在facebook的chat service是用proxygen实现的,而且facebook 几十万台PHP server都在proxygen上,所以可以说是工业级强度的基础设施,不过build起来要花点时间。

Q16:为了避免服务过载和cascade failure,除了在服务链的前端采用一些fail fast 的设计,还有没有其它的实践作法,比如还是想支持一部分用户或特定类型的请求,采用优先级队列等。 就这个问题,Uber,facebook在服务化系统中还有没有其它技术实践?另外出现大规模服务过载后的恢复流程方面,有没有碰到什么坑或建议?

“比如还是想支持一部分用户或特定类型的请求” 这个其实比较难实现 因为当服务过载的时候 在acceptor thread就停止接受新的connection了,那就不知道是哪个用户的请求 。这个需要在应用层实现,比如feature flag可以针对一些用户关掉一些feature。 我发现有个很有用的东西就是facebook有个global kill switch,可以允许x%的流量,这个当所有service一起crash 重启的时候比较有用。

关注中国IDC圈官方微信:idc-quan 我们将定期推送IDC产业最新资讯

查看心情排行你看到此篇文章的感受是:


  • 支持

  • 高兴

  • 震惊

  • 愤怒

  • 无聊

  • 无奈

  • 谎言

  • 枪稿

  • 不解

  • 标题党