1// compile 2 3// Copyright 2019 The Go Authors. All rights reserved. 4// Use of this source code is governed by a BSD-style 5// license that can be found in the LICENSE file. 6 7// This code failed on arm64 in the register allocator. 8// See issue 33355. 9 10package server 11 12import ( 13 "bytes" 14 "sync" 15) 16 17type client struct { 18 junk [4]int 19 mu sync.Mutex 20 srv *Server 21 gw *gateway 22 msgb [100]byte 23} 24 25type gateway struct { 26 cfg *gatewayCfg 27 outsim *sync.Map 28} 29 30type gatewayCfg struct { 31 replyPfx []byte 32} 33 34type Account struct { 35 Name string 36} 37 38type Server struct { 39 gateway *srvGateway 40} 41 42type srvGateway struct { 43 outo []*client 44} 45 46type subscription struct { 47 queue []byte 48 client *client 49} 50 51type outsie struct { 52 ni map[string]struct{} 53 sl *Sublist 54 qsubs int 55} 56 57type Sublist struct { 58} 59 60type SublistResult struct { 61 psubs []*subscription 62 qsubs [][]*subscription 63} 64 65var subPool = &sync.Pool{} 66 67func (c *client) sendMsgToGateways(acc *Account, msg, subject, reply []byte, qgroups [][]byte) { 68 var gws []*client 69 gw := c.srv.gateway 70 for i := 0; i < len(gw.outo); i++ { 71 gws = append(gws, gw.outo[i]) 72 } 73 var ( 74 subj = string(subject) 75 queuesa = [512]byte{} 76 queues = queuesa[:0] 77 mreply []byte 78 dstPfx []byte 79 checkReply = len(reply) > 0 80 ) 81 82 sub := subPool.Get().(*subscription) 83 84 if subjectStartsWithGatewayReplyPrefix(subject) { 85 dstPfx = subject[:8] 86 } 87 for i := 0; i < len(gws); i++ { 88 gwc := gws[i] 89 if dstPfx != nil { 90 gwc.mu.Lock() 91 ok := bytes.Equal(dstPfx, gwc.gw.cfg.replyPfx) 92 gwc.mu.Unlock() 93 if !ok { 94 continue 95 } 96 } else { 97 qr := gwc.gatewayInterest(acc.Name, subj) 98 queues = queuesa[:0] 99 for i := 0; i < len(qr.qsubs); i++ { 100 qsubs := qr.qsubs[i] 101 queue := qsubs[0].queue 102 add := true 103 for _, qn := range qgroups { 104 if bytes.Equal(queue, qn) { 105 add = false 106 break 107 } 108 } 109 if add { 110 qgroups = append(qgroups, queue) 111 } 112 } 113 if len(queues) == 0 { 114 continue 115 } 116 } 117 if checkReply { 118 checkReply = false 119 mreply = reply 120 } 121 mh := c.msgb[:10] 122 mh = append(mh, subject...) 123 if len(queues) > 0 { 124 mh = append(mh, mreply...) 125 mh = append(mh, queues...) 126 } 127 sub.client = gwc 128 } 129 subPool.Put(sub) 130} 131 132func subjectStartsWithGatewayReplyPrefix(subj []byte) bool { 133 return len(subj) > 8 && string(subj[:4]) == "foob" 134} 135 136func (c *client) gatewayInterest(acc, subj string) *SublistResult { 137 ei, _ := c.gw.outsim.Load(acc) 138 var r *SublistResult 139 e := ei.(*outsie) 140 r = e.sl.Match(subj) 141 return r 142} 143 144func (s *Sublist) Match(subject string) *SublistResult { 145 return nil 146} 147 148