Objective: Introducing a few methods to do random sampling in Python
1. The basics
random.random(), random.randrange(), random.shuffle, random.choice(), random.sample() import random #to produce deterministic results, set the seed #e.g., random.seed(10) # a random number between 0 and 1 print(random.random()) # 4 random numbers between 0 and 1 print([random.random() for _ in range(4)]) # a random integer in [0,3) print(random.randrange(3)) # a random integer in (3,6) print(random.randrange(3, 6)) # randomly shuffle an array: up_to_five = list(range(5)) random.shuffle(up_to_five) print(up_to_five) # randomly choose one element from a list my_fav_city = random.choice(["Evanston", "Chicago", "Thousand Oaks"]) print(my_fav_city)
2. Choose k random numbers from a list (with or without replacement)
source = list(range(10)) # randomly choose 7 numbers from a list (with replacement) choose7_withReplacement = [random.choice(source) for _ in range(7)] print(choose7_withReplacement)
# randomly choose 7 numbers from a list (without replacement) choose7_withOutReplacement = random.sample(source, 7) print(choose7_withOutReplacement)
3. Online sampling: Choose k random numbers from a stream of numbers with size n, where n can grow as we process; n can be very large so that the list won’t fit into memory, or n can be unknown.
Idea: Reservoir Sampling.
Below, we implement the most common method: Algorithm R
S[1..n] is the stream to process, R[1..k] will contain the returned k random samples.
#Assume we have a Stream object with the following methods: class Stream(object): def __init__(self, n):# n is the current length of the stream self.n = n def hasNext(self):#return 1 if the stream continues #assume the stream end at 100 (unknown to user) return 1 if self.n <= 100 else 0 def getNext(self):#return a random next number #assume the streamn consists of numbers 0,1,...,9 return random.randrange(10) if self.hasNext() else None def update(self):#increase the stream length by 1 self.n += 1 def leng(self): #return length of current stream return self.n
Now, let’s run the experiment:
def ReservoirSample(S, k): R = [None] * k i = S.leng() # length of stream so far #basically, i=0 to start with while S.hasNext(): #while the Stream continues to generate number if i < k: #the first k elements 0...k-1 get assigned first R[i] = S.getNext() else: #i>=k, replace previous assigned element j = random.randrange(0,i) #random from 0 to i-1 inclusive if j <= k-1: R[j] = S.getNext() S.update() # length of the stream is increased by 1 i = S.leng() return R def experiment(): #main function k = 50 #number of samples we want to pick S = Stream(0) #initialize the length of stream to 0 samples = ReservoirSample(S, k) print(samples) if __name__ == "__main__": experiment()
Analysis: Why does the above Algorithm R work? In other words, prove that the algorithm returns each value in the stream with probability k/i, when i is the current length of the stream:
- First of all, the latest ith element is selected with probability k/i (done).
- Now: P[the jth element from 1 to i-1 (inclusive) is replaced] = (k/i) (1/k) = 1/i. You can also see this directly from j = random.randrange(0,i), which returns a number between 0 and i-1 thus each number is chosen with probability 1/i.
- Thus, P[the jth element between 1 and i is NOT replaced] = 1-1/i = (i-1)/i
- Now, using induction: Assuming when the stream is at length i-1, each element is selected with probability k/(i-1) (verify that this is true for i=1,2). Now:
- P[the jth element survives after the latest round] = P[it survives in previous round] * P[it is not replaced in this round] = (k/(i-1)) ((i-1)/i) = k/i (done!)
Reference: Reservoir Sampling.