1#!/usr/bin/python 2# 3# Copyright 2014 Google Inc. All rights reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); 6# you may not use this file except in compliance with the License. 7# You may obtain a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, 13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14# See the License for the specific language governing permissions and 15# limitations under the License. 16 17""" 18rappor_test.py: Tests for rappor.py 19""" 20import cStringIO 21import copy 22import math 23import random 24import unittest 25 26import rappor # module under test 27 28 29class RapporParamsTest(unittest.TestCase): 30 31 def setUp(self): 32 self.typical_instance = rappor.Params() 33 ti = self.typical_instance # For convenience 34 ti.num_cohorts = 64 # Number of cohorts 35 ti.num_hashes = 2 # Number of bloom filter hashes 36 ti.num_bloombits = 16 # Number of bloom filter bits 37 ti.prob_p = 0.40 # Probability p 38 ti.prob_q = 0.70 # Probability q 39 ti.prob_f = 0.30 # Probability f 40 41 def testFromCsv(self): 42 f = cStringIO.StringIO('k,h,m,p,q,f\n32,2,64,0.5,0.75,0.6\n') 43 params = rappor.Params.from_csv(f) 44 self.assertEqual(32, params.num_bloombits) 45 self.assertEqual(64, params.num_cohorts) 46 47 # Malformed header 48 f = cStringIO.StringIO('k,h,m,p,q\n32,2,64,0.5,0.75,0.6\n') 49 self.assertRaises(rappor.Error, rappor.Params.from_csv, f) 50 51 # Missing second row 52 f = cStringIO.StringIO('k,h,m,p,q,f\n') 53 self.assertRaises(rappor.Error, rappor.Params.from_csv, f) 54 55 # Too many rows 56 f = cStringIO.StringIO('k,h,m,p,q,f\n32,2,64,0.5,0.75,0.6\nextra') 57 self.assertRaises(rappor.Error, rappor.Params.from_csv, f) 58 59 def testGetBloomBits(self): 60 for cohort in xrange(0, 64): 61 b = rappor.get_bloom_bits('foo', cohort, 2, 16) 62 #print 'cohort', cohort, 'bloom', b 63 64 def testGetPrr(self): 65 bloom = 1 66 num_bits = 8 67 for word in ('v1', 'v2', 'v3'): 68 masks = rappor.get_prr_masks('secret', word, 0.5, num_bits) 69 print 'masks', masks 70 71 def testToBigEndian(self): 72 b = rappor.to_big_endian(1) 73 print repr(b) 74 self.assertEqual(4, len(b)) 75 76 def testEncoder(self): 77 # Test encoder with deterministic random function. 78 params = copy.copy(self.typical_instance) 79 params.prob_f = 0.5 80 params.prob_p = 0.5 81 params.prob_q = 0.75 82 83 # return these 3 probabilities in sequence. 84 rand = MockRandom([0.0, 0.6, 0.0], params) 85 86 e = rappor.Encoder(params, 0, 'secret', rand) 87 88 irr = e.encode("abc") 89 90 self.assertEquals(64493, irr) # given MockRandom, this is what we get 91 92 93class MockRandom(object): 94 """Returns one of three random values in a cyclic manner. 95 96 Mock random function that involves *some* state, as needed for tests that 97 call randomness several times. This makes it difficult to deal exclusively 98 with stubs for testing purposes. 99 """ 100 101 def __init__(self, cycle, params): 102 self.p_gen = MockRandomCall(params.prob_p, cycle, params.num_bloombits) 103 self.q_gen = MockRandomCall(params.prob_q, cycle, params.num_bloombits) 104 105class MockRandomCall: 106 def __init__(self, prob, cycle, num_bits): 107 self.cycle = cycle 108 self.n = len(self.cycle) 109 self.prob = prob 110 self.num_bits = num_bits 111 112 def __call__(self): 113 counter = 0 114 r = 0 115 for i in xrange(0, self.num_bits): 116 rand_val = self.cycle[counter] 117 counter += 1 118 counter %= self.n # wrap around 119 r |= ((rand_val < self.prob) << i) 120 return r 121 122 123if __name__ == "__main__": 124 unittest.main() 125