Part 2: Shuffle Sharding with Zone Awareness

Introduction

In the previous post, a primitive implementation of shuffle sharding was explored. This post discusses an extended version of the algorithm.

1.1 Problem Statement & Approach

Problem Statement: Given a list of servers from different Availability Zones (in a cloud computing context), find a way to distribute them as shards with Skewness guarantees.

Skewness is concerned with the distribution of servers in each Availablity Zone. Without an even distribution of servers in each AZ, when there is a Zone-wide outage, quality degradation would be higher. So to fix this, we define a configurable constraint maxSkew for distributing servers across zones within a given shard.

maxSkew for this problem (inspired by K8s context) can be defined as max( [(numberOfServers - globalMin) for each AvailabilityZone] ), where globalMin is the minimum number of servers in a zone within that shard. For eg, if we have 3 zones, 3 servers each per zone & 5 servers per shard, then the maxSkew for the following distributions would be:

servers in zones(A,B,C)maxSkew
(5,0,0)max(5-0, 5-0, 0-0) = 5
(4,1,0)max(4-0, 1-0, 0-0) = 4
(3,1,1)max(3-1, 1-1, 1-1) = 2
(2,2,1)max(2-1, 2-1, 1-1) = 1

Why is this problem important?

  • A major theme among all scalable distributed systems is to prevent SPOF (Single Point of Failure). A good design anticipates failure and ensures redundancy at every layer of the system.

  • In part 1 of this post, we anticipated failure at the server pool level i.e. what happens when a DDOSing client takes down servers one after another. To mitigate this, we implemented shuffle sharding to ensure that rogue clients don't take down all the servers.

  • But what if the entire data center goes down due to other reasons like power failure, natural disaster etc.? To mitigate this, we run our systems on multiple data centers. But at the same time, we also want shuffle sharding in place to avoid regressing to 1st failure mode. Thus, the solution needs to be shuffle sharding but also needs to have zone awareness incorporated in it.

2 possible Implementation Approaches came to my mind:

  • A: Potpourri (a top-down approach): Consider all nCr combinations (where n = sum of all servers in all AZs) and eliminate unwanted combinations by imposing Skew Constraint.

  • B: Bottom-up Approach: Generalise a way to consider only those combinations which would satisfy the constraint.

In my case, I found approach A to be more solvable (but I'd be happy to hear about solutions via a bottom-up approach as well).

1.1.1 Potpourri approach

Generating Shards

  • The approach is similar to Part 1 of this series. The only difference is that the shape of the data changes (since we have to consider availability zones) and an additional validation step to check for skewness constraint before deciding whether a particular server combination can be an eligible shard.

  • The advantage of this potpourri approach is that in the future if we have additional constraints on shard eligibility, we just have to plug another function into this shard generation function.

// TLDR: Given a set of items (`totalServerList`) and a subset size `shardSize`,
// determine all subset combinations (nCr) provided that the 
// skewness constraint (`maxSkewSatisfied()`) is respected. 
func generateZAServerCombos(totalServerList []map[string][]string, shardSize int, maxSkew int) [][]map[string]string {

    var potpourri []map[string]string        // Looks like: [ {server:"localhos:8080", az: "east1"}, {server: "localhost:8081", az: "east-2"}]
    result := make([][]map[string]string, 0) // Slice with each element of type `[]map[string]string`
    azCount := make(map[string]int)

    for _, azServerList := range totalServerList {
        // create potpourri of all servers
        for az, serversInAZ := range azServerList {
            azCount[az] = 0
            for _, server := range serversInAZ {                
                server := map[string]string{"server": server, "az": az}
                potpourri = append(potpourri, server)
            }
        }
    }

    var util_nCr func(start int, curServerCombo []map[string]string)
    util_nCr = func(start int, curServerCombo []map[string]string) {
        // Basecase: When shard is full
        if len(curServerCombo) == shardSize {
            // make a copy of current combination
            dst := make([]map[string]string, shardSize)
            copy(dst, curServerCombo)
            // fmt.Printf("Another shard found: %v\n\n/////////////////////////////////\n", dst)
            result = append(result, dst)
            return
        }

        // Traverse along array containing all servers & pick 
        // servers to construct each combination in nCr
        for i := start; i < len(potpourri); i++ {

            // if skewness constraint not satisfied then reject that 
            // combination we are passing a copy of `azCount` since 
            // golang maps are pointers to runtime hmap structure 
            // and the result ends like a pass-by-reference scenario
            if maxSkewSatisfied(append(curServerCombo, potpourri[i]), mapClone(azCount), maxSkew) {
                curServerCombo = append(curServerCombo, potpourri[i])
                // fmt.Printf("\ncurServerCombo: %v\n##########\n", curServerCombo)
                util_nCr(i+1, curServerCombo)
                curServerCombo = curServerCombo[:len(curServerCombo)-1] // pop last element
            }
        }
    }

    util_nCr(0, make([]map[string]string, 0))
    fmt.Println("Total number of shards: ", len(result))
    return result
}

Function to calculate skewness

func maxSkewSatisfied(combo []map[string]string, azCount map[string]int, maxSkew int) bool {
    minCount := math.MaxInt16

    // azCount is a just a map of various AZ & count of servers in 
    // those AZs. We pass this explicitly since we want ALL zones 
    // to be counted during calculation for skew.
    // For eg, if we have maxSkew=1 &  3 zones A,B,C 
    // but input is [{"server": "localhost:8080", "az": "A"}, {"server": "localhost:8081", "az": "A"}]), 
    // then this combo should be rejected. 

    // calculate server count of each region within the combo
    for _, server := range combo {
        if az, ok := server["az"]; ok {
            azCount[az] += 1
        } else {
            fmt.Printf("\nUnable to fetch region of server %s; so rejecting the combo\n", server["server"])
            return false
        }
    }

    // calculate global minimum
    for _, v := range azCount {
        if v < minCount {
            minCount = v
        }
    }

    // Since golang doesnt have inbuilt function to calculate abs(int)
    absInt := func(x int) int {
        if x < 0 {
            return x * -1
        } else {
            return x
        }
    }

    // If diff b/w serverCount & global minimum less than maxSkew
    // for all region, then satisifedi. Else return false
    for _, v := range azCount {
        skew := absInt(v - minCount)
        if skew > maxSkew {
            return false
        }
    }

    return true
}

1.1.2 Running the code

The server list provided to the load balancer is mentioned below:

{
    "zones": [
        {
            "zoneA": ["http://localhost:8090", "http://localhost:8091", "http://localhost:8092"]
        },
        {
            "zoneB": ["http://localhost:8093", "http://localhost:8094", "http://localhost:8095"]
        }
    ]    
}

So, for 2 zones (A & B) and 3 servers per zone, we would have 9 eligible shards viz: (A1, B1), (A1, B2), (A1, B3), (A2, B1), (A2, B2), (A2, B3), (A3, B1), (A3, B2), (A3, B3)

Run the load balancer & notice the list of allocated shards below

$ go run ss-za-main.go -shardSize 2
Total number of shards:  9
[map[az:zoneA server:http://localhost:8090] map[az:zoneB server:http://localhost:8093]]
[map[az:zoneA server:http://localhost:8090] map[az:zoneB server:http://localhost:8094]]
[map[az:zoneA server:http://localhost:8090] map[az:zoneB server:http://localhost:8095]]
[map[az:zoneA server:http://localhost:8091] map[az:zoneB server:http://localhost:8093]]
[map[az:zoneA server:http://localhost:8091] map[az:zoneB server:http://localhost:8094]]
[map[az:zoneA server:http://localhost:8091] map[az:zoneB server:http://localhost:8095]]
[map[az:zoneA server:http://localhost:8092] map[az:zoneB server:http://localhost:8093]]
[map[az:zoneA server:http://localhost:8092] map[az:zoneB server:http://localhost:8094]]
[map[az:zoneA server:http://localhost:8092] map[az:zoneB server:http://localhost:8095]]

serverShardPool: len=9 cap=16
2023/06/04 18:23:08 Load Balancer started at :3030

Request made by tenant: b
Tenant not found (0). Creating one

1.1.3 Edge Case / Improvements:

  • [EdgeCase] Load balancer as a SPOF: Currently, crucial data is stored in memory & this has 2 problems: a) Only one server can be run for the Load Balancer which makes it a is a single point of failure b) If the server crashes, all in-memory data is lost

    • Solution: Move the data to a datastore && make the LB stateless.
  • [Improvement] Location awareness for serving: Right now, forwarding requests to the server is not based on client location.

    • Solution: Among other things, the load balancer can be modified to route the request to the nearest server from the client based on the client's Request Headers.
  • [Improvement] Generalisation to N-dimensional shard distribution: Current code is specific to a single dimension (i.e. zone awareness) but for a more general system, the algorithm should be able to distribute servers in a shard by considering multiple dimensions.

  • [EdgeCase] What if an entire zone went away?

    • Current code's approach: Nothing can be done in this case. Servers in other zones will keep serving requests. New servers can simply be added to the shard since it will violate the skewness constraint.

    • Potential solution: Add new servers in other regions. Implement a mechanism to mark a zone as unhealthy & under such a scenario, trigger a rebalancing mechanism to drop servers from those regions. Then skewness constraint will no longer be violated.

1.2 Brief peek at other existing implementations

Route53 Infima Library

Source: https://github.com/awslabs/route53-infima/tree/master

Infima library has this idea of a "Lattice" - an N-dimensional Container class for distributing shards based on various constraints. For eg, Availability Zone and server type (high capacity vs load capacity) can be a few such dimensions.

There are a bunch of sample implementations:

  • SimpleSignatureShuffleSharder.java: Probabilistic generation of shards based on a set of identifiers & a seed value.

  • StatefulSearchingShuffleSharder.java: Uses a datastore to perform a stateful search for a new shuffle shard, with guarantees about the number of items that may overlap between the new shuffle shard and all existing recorded shuffle shards.

K8s API Server

Source: https://pkg.go.dev/k8s.io/apiserver/pkg/util/shufflesharding

From my limited understanding of source code:

  • Idea: A hash value with enough entropy (calculated based on a couple of parameters) is used for probabilistically assigning a set of servers from a given pool (the package emulates a "Card Shuffling & Dealing" process). This approach same as the one taken by the blog post, except that retries are not made upon collisions.

  • Purpose: To pick queue(s?) to enqueue requests (& from those queues, I guess the requests are picked & executed by further backend workers).

Implementation in this blog

We pre-compute server allocation to shards && while doing so, we use a deterministic approach by calculating all the possibilities.

Assigning tenants to a particular shard is probabilistic & based on hashing of tenant + modulo operation.