1#
2# Copyright 2015 The Android Open Source Project
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16"""Script for sending data to a port.
17
18This script provides a simple shell interface for sending data at run-time to a
19port.
20
21Usage:
22    1. Choose a port to use. Use 'adb forward tcp:<port>
23    tcp:<port>' to forward the port to the device.
24    2. In a separate shell, build and push the test vendor library to the device
25    using the script mentioned in option A (i.e. without the --test-channel flag
26    set).
27    3. Once logcat has started, turn Bluetooth on from the device.
28    4. Run this program, in the shell from step 1,  the port, also from step 1,
29    as arguments.
30
31    scapy is the tool we use to build packets in Python.
32
33    >>> d = HCI_Hdr(type=1) / HCI_Command_Hdr(opcode = 0x1004) /
34    Raw(load='\x01')
35    >>> print(d)
36    <HCI_Hdr  type=Command |<HCI_Command_Hdr  opcode=0x1004 |<Raw  load='\x01'
37    |>>>
38    >>> raw(d)
39    '\x01\x04\x10\x01\x01'
40    >>> hexdump(d)
41    0000  0104100101                       .....
42
43
44    >>> pkt = HCI_Hdr('\x02\x02\x20\x0a\x00\x06\x00\x01\x00') /
45    L2CAP_CmdHdr(code=10, id=2, len=2) /L2CAP_InfoReq(type=2)
46    >>> pkt
47    <HCI_Hdr  type=ACL Data |<HCI_ACL_Hdr  handle=2 PB=0 BC=2 len=10 |<L2CAP_Hdr
48    len=6 cid=control |<L2CAP_CmdHdr  code=info_req id=2 len=2 |<L2CAP_InfoReq
49    type=FEAT_MASK |>>>>>
50    >>> pkt = HCI_Hdr(type="ACL Data") / HCI_ACL_Hdr(handle=2, PB=0, BC=2,
51    len=10) / L2CAP_Hdr(len=6, cid="control") / L2CAP_CmdHdr(code="info_req",
52    id=2, len=2) / L2CAP_InfoReq(type="FEAT_MASK")
53    >>> raw(pkt)
54    '\x02\x02 \n\x00\x06\x00\x01\x00\n\x02\x02\x00\x02\x00'
55    >>> hexdump(pkt)
56    0000  0202200A00060001000A0202000200   .. ............
57
58
59"""
60
61#!/usr/bin/env python3
62
63import binascii
64import cmd
65import random
66import socket
67import string
68import struct
69import sys
70from scapy.all import *
71
72DEVICE_NAME_LENGTH = 6
73DEVICE_ADDRESS_LENGTH = 6
74
75
76# Used to generate fake device names and addresses during discovery.
77def generate_random_name():
78    return ''.join(random.SystemRandom().choice(string.ascii_uppercase + \
79      string.digits) for _ in range(DEVICE_NAME_LENGTH))
80
81
82def generate_random_address():
83    return ''.join(random.SystemRandom().choice(string.digits) for _ in \
84      range(DEVICE_ADDRESS_LENGTH))
85
86
87class Connection(object):
88    """Simple wrapper class for a socket object.
89
90  Attributes:
91    socket: The underlying socket created for the specified address and port.
92  """
93
94    def __init__(self, port):
95        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
96        self._socket.connect(('localhost', port))
97        self._socket.setblocking(0)
98
99    def close(self):
100        self._socket.close()
101
102    def send(self, data):
103        self._socket.sendall(data)
104
105    def receive(self, size):
106        return self._socket.recv(size)
107
108
109class RawPort(object):
110    """Checks outgoing commands and sends them once verified.
111
112  Attributes:
113    connection: The connection to the HCI port.
114  """
115
116    def __init__(self, port):
117        self._connection = Connection(port)
118        self._closed = False
119
120    def close(self):
121        self._connection.close()
122        self._closed = True
123
124    def send_binary(self, args):
125        joined_args = ''.join(arg for arg in args)
126        print(joined_args)
127        packet = binascii.a2b_hex(joined_args)
128        if self._closed:
129            return
130        self._connection.send(packet)
131        received = self.receive_response()
132        received_bytes = bytearray(received)
133        print(raw(received_bytes))
134
135    def receive_response(self):
136        if self._closed:
137            return
138        size_chars = self._connection.receive(4)
139        if not size_chars:
140            print('Debug: No response')
141            return False
142        size_bytes = bytearray(size_chars)
143        response_size = 0
144        for i in range(0, len(size_chars) - 1):
145            response_size |= ord(size_chars[i]) << (8 * i)
146        response = self._connection.receive(response_size)
147        return response
148
149    def lint_command(self, name, args, name_size, args_size):
150        assert name_size == len(name) and args_size == len(args)
151        try:
152            name.encode('utf-8')
153            for arg in args:
154                arg.encode('utf-8')
155        except UnicodeError:
156            print('Unrecognized characters.')
157            raise
158        if name_size > 255 or args_size > 255:
159            raise ValueError  # Size must be encodable in one octet.
160        for arg in args:
161            if len(arg) > 255:
162                raise ValueError  # Size must be encodable in one octet.
163
164
165class RawPortShell(cmd.Cmd):
166    """Shell for sending binary data to a port.
167
168  """
169
170    def __init__(self, raw_port):
171        cmd.Cmd.__init__(self)
172        self._raw_port = raw_port
173
174    def do_send(self, args):
175        """Arguments: dev_type_str Add a new device of type dev_type_str.
176
177    """
178        self._raw_port.send_binary(args.split())
179
180    def do_quit(self, args):
181        """Arguments: None.
182
183    Exits.
184    """
185        self._raw_port.close()
186        print('Goodbye.')
187        return True
188
189    def do_help(self, args):
190        """Arguments: [dev_num [attr]] List the commands available, optionally filtered by device and attr.
191
192    """
193        if (len(args) == 0):
194            cmd.Cmd.do_help(self, args)
195
196
197def main(argv):
198    if len(argv) != 2:
199        print('Usage: python raw_port.py [port]')
200        return
201    try:
202        port = int(argv[1])
203    except ValueError:
204        print('Error parsing port.')
205    else:
206        try:
207            raw_port = RawPort(port)
208        except (socket.error, e):
209            print('Error connecting to socket: %s' % e)
210        except:
211            print('Error creating (check arguments).')
212        else:
213            raw_port_shell = RawPortShell(raw_port)
214            raw_port_shell.prompt = '$ '
215            raw_port_shell.cmdloop('Welcome to the RootCanal Console \n' + 'Type \'help\' for more information.')
216
217
218if __name__ == '__main__':
219    main(sys.argv)
220