Bluetoothリアルタイム反応

簡単な紹介、または痛みは何ですか



最近、カスタムデバイスであまり良く設計されていないプロトコルを使用して、Bluetoothを操作するためのモジュールを備えたアプリケーションに積極的に取り組んでいます。 そうそう 問題。







私はアプリケーションの反応性の誠実なファンなので、ネットワークには解決策がないため、そのような問題を自分で解決する必要がありました。 絶対に。 Bluetoothデバイスを操作するための結果のアーキテクチャについてお話ししたいと思います。







ジェダイの危険



Bluetoothを使用する際に開発者が覚えておくべき最初の重要な点は、途中でパケットが破損する可能性があるということです。 また、ノイズを伴うこともあります。 そして、これは100万人に1人のケースではなく、そのような現象は非常に頻繁に発生する可能性があり、処理する必要があります。 それでも、Bluetoothは切断されたり、接続されなかったり、接続されているふりをしたりすることがありますが、実際にはこれは何も意味しないことがわかっています...







これらの問題を解決する例として、ヘッダー(最初のNバイト)を使用してタイプによって決定され、いくつかの単純なチェックサムを使用して検証されるイベントを処理するためのマイクロフレームワークを設計します。 コードを乱雑にしないために、プロトコルによるヘッダーのサイズは固定されていると想定しています。 すべてのパケットを2つのタイプに分割します。固定長のパケットと、別のバイトで送信される動的なパケットです。







設計



アプリケーションで発生する可能性のあるイベントの説明から始めましょう。 したがって、一般的な抽象化は、受け入れられた制限を考慮して、次のようになります。







sealed class Event { val headSize: Int = 2 abstract val head: ByteArray abstract fun isCorrupted(): Boolean //To be continued }
      
      





さらに、すべてのパッケージに対して定数プロパティのセットを定義したら、次の条件を何らかの形で形式化する必要があります。







  1. パケットは何らかのタイプに属していると考えます
  2. バッファにバイトを追加する必要があります。これまでのところ、パケットは
  3. アセンブリの条件が満たされていないため、バッファがクラッシュするはずです(この項目はセキュリティのためにさらに必要です。アプリケーションのテスト中にログを追加して、残りの条件の完全性を確認することをお勧めします)
  4. バッファからパケットを収集し、その有効性を確認しようとします


これらの4つの条件により、次の形式のインターフェイスが作成されます。







 interface EventMatcher { val headSize: Int fun matches(packet: ByteBuffer): Boolean fun create(packet: ByteBuffer): Event fun shouldBuffer(packet: ByteBuffer): Boolean fun shouldDrop(packet: ByteBuffer): Boolean }
      
      





提供するコンポーネントを作成する 便利だと思いますが、お任せします 既存のすべてのタイプのマッチへのプロキシインターフェイス、目立たないもの、catの下のコード:







プロキシマッチ
 class EventMatchersAdapter { private val matchers = mutableMapOf<KClass<out Event>, EventMatcher>() fun register(event: KClass<out Event>, matcher: EventMatcher) = apply { matchers.put(event, matcher) } fun unregister(event: KClass<out Event>) = apply { matchers.remove(event) } fun knownEvents(): List<KClass<out Event>> = matchers.keys.toList() fun matches(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.matches(packet) ?: false fun shouldBuffer(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldBuffer(packet) ?: false fun shouldDrop(packet: ByteBuffer, event: KClass<out Event>): Boolean = matchers[event]?.shouldDrop(packet) ?: false fun create(packet: ByteBuffer, event: KClass<out Event>): Event? = matchers[event]?.create(packet) }
      
      





パッケージでは、特定のパッケージが破損しているかどうかを判断する方法を説明しています。 これはやや便利なアプローチであり、プロトコルの設計が不十分であるため、多くの問題を抱えることはありません。







固定長パッケージの例
 data class A(override val head: ByteArray, val payload: ByteArray, val checksum: Byte): Event() { companion object { //(two bytes of head) + (2 bytes of payload) + (byte of checksum) @JvmStatic val length = 5.toByte() @JvmStatic val headValue = byteArrayOf(0x00, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): A { packet.rewind() return A( ByteArray(2, { packet.get() }), ByteArray(2, { packet.get() }), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = packet.position() < length override fun shouldDrop(packet: ByteBuffer): Boolean = packet.position() > length } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? A == null) return false other as A return Arrays.equals(head, other.head) && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } }
      
      





動的長パッケージの例
 data class C(override val head: ByteArray, val length: Byte, val payload: ByteArray, val checksum: Byte): Event() { companion object { @JvmStatic val headValue = byteArrayOf(0x01, 0x00) @JvmStatic val matcherValue = object: EventMatcher { override val headSize: Int = 2 override fun matches(packet: ByteBuffer): Boolean { if(packet.position() == 0) return true if(packet.position() == 1) return packet[0] == headValue[0] return packet[0] == headValue[0] && packet[1] == headValue[1] } override fun create(packet: ByteBuffer): C { packet.rewind() val msb = packet.get() val lsb = packet.get() val length = packet.get() return C( byteArrayOf(msb, lsb), length, packet.take(3, length.toPositiveInt()), packet.get() ) } override fun shouldBuffer(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> true else -> packet.position() < (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } override fun shouldDrop(packet: ByteBuffer): Boolean = when(packet.position()) { in 0..2 -> false else -> packet.position() > (packet[2].toPositiveInt() + 4) //increase by (2 bytes of head) + (1 byte of length) + (1 byte of checksum) } } } override fun isCorrupted(): Boolean = checksumOf(payload) != checksum override fun equals(other: Any?): Boolean { if(other as? C == null) return false other as C return Arrays.equals(head, other.head) && length == other.length && Arrays.equals(payload, other.payload) && checksum == other.checksum } override fun hashCode(): Int { var result = Arrays.hashCode(head) result = result * 31 + length.hashCode() result = result * 31 + Arrays.hashCode(payload) result = result * 31 + checksum.hashCode() return result } }
      
      





さらに-パケット読み取りアルゴリズム自体を説明する必要があります。







  1. いくつかの異なるタイプをサポート
  2. パッケージの損傷を回復する
  3. Flowableと友達になります


Subscriberインターフェースの背後に隠されたアルゴリズムの実装:







 class EventsBridge(private val adapter: EventMatchersAdapter, private val emitter: FlowableEmitter<Event>, private val bufferSize: Int = 128): DisposableSubscriber<Byte>() { private val buffers: Map<KClass<out Event>, ByteBuffer> = mutableMapOf<KClass<out Event>, ByteBuffer>() .apply { for(knownEvent in adapter.knownEvents()) { put(knownEvent, ByteBuffer.allocateDirect(bufferSize)) } } .toMap() override fun onError(t: Throwable) { emitter.onError(t) } override fun onComplete() { emitter.onComplete() } override fun onNext(t: Byte) { for((key, value) in buffers) { value.put(t) adapter.knownEvents() .filter { it == key } .forEach { if (adapter.matches(value, it)) { when { adapter.shouldDrop(value, it) -> { value.clear() } !adapter.shouldBuffer(value, it) -> { val event = adapter.create(value, it) if (!emitter.isCancelled && event != null && !event.isCorrupted()) { release() emitter.onNext(event) } else { value.clear() } } } } else { value.clear() } } } } private fun release() { for(buffer in buffers) buffer.value.clear() } }
      
      





使用する



単体テストを実行する例を考えてみましょう。







1つのタイプのパッケージの作業の簡単なテスト
 @Test fun test_single_fixedLength() { val adapter = EventMatchersAdapter() .register(Event.A::class, Event.A.matcherValue) val packetA = generateCorrectPacketA() val testSubscriber = TestSubscriber<Event>() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(byte in packetA) { byteEmitter.onNext(byte) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValue { event -> event is Event.A && !event.isCorrupted() } }
      
      





多数のノイズ、いくつかのタイプのパッケージでテストする
 @Test fun test_multiple_dynamicLength_mixed_withNoise() { val adapter = EventMatchersAdapter() .register(Event.C::class, Event.C.matcherValue) .register(Event.D::class, Event.D.matcherValue) val packetC1 = generateCorrectPacketC() val packetD1 = generateCorrectPacketD() val packetD2 = generateCorruptedPacketD() val packetC2 = generateCorruptedPacketC() val testSubscriber = TestSubscriber<Event>() val random = Random() Flowable.create<Event>( { emitter -> val bridge = EventsBridge(adapter, emitter) Flowable.create<Byte>({ byteEmitter -> for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD1) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetD2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } for(byte in packetC2) { byteEmitter.onNext(byte) } for(b in 0..100) { byteEmitter.onNext(random.nextInt().toByte()) } }, BackpressureStrategy.BUFFER).subscribe(bridge) }, BackpressureStrategy.BUFFER ) .subscribe(testSubscriber) testSubscriber.assertNoErrors() testSubscriber.assertValueCount(2) }
      
      





テストパッケージの生成
 private fun generateCorrectPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], checksumOf(payload) ) } private fun generateCorrectPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { index -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] 4 + payload.size - 1 -> checksumOf(payload) else -> 0.toByte() } }) } private fun generateCorruptedPacketB(): ByteArray { val rnd = Random() val payload = byteArrayOf( rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte(), rnd.nextInt().toByte() ) return byteArrayOf( Event.B.headValue[0], Event.B.headValue[1], payload[0], payload[1], payload[2], payload[3], (checksumOf(payload) + 1.toByte()).toByte() ) } private fun generateCorruptedPacketC(): ByteArray { val rnd = Random() val payload = List(rnd.nextInt(16), { _ -> rnd.nextInt().toByte() }).toByteArray() return ByteArray(4 + payload.size, { index -> when(index) { 0 -> Event.C.headValue[0] 1 -> Event.C.headValue[1] 2 -> payload.size.toByte() in 3..(4 + payload.size - 2) -> payload[index - 3] else -> (checksumOf(payload) + 1.toByte()).toByte() } }) }
      
      





テストに使用される単純なチェックサム
 inline fun checksumOf(data: ByteArray): Byte { var result = 0x00.toByte() for(b in data) { result = (result + b).toByte() } return (result.inv() + 1.toByte()).toByte() }
      
      





そして、なぜこれがすべて必要なのですか?



この例では、可能性のある損傷を回避しながら、ほとんど任意のイベントを処理するときにモジュール性を維持することがどれだけ簡単かつ制約なしで可能であるかを示したいと思います。パケットとノイズの多い通信チャネル。







それでは、次は何ですか?



RxBluetoothの小さなラッパーを作成しましょう。これにより、さまざまなイベントをリッスンし、リアクティブスタイルでさまざまな接続を操作できます。







すべてのコードは、2つのサービスと1つのリポジトリという3つのコンポーネントセットに条件付きで分割できます。

当社のサービスはそれぞれ接続を提供し、接続データを処理し、リポジトリは特定の接続を処理するための抽象化を提供し、接続の暗黙的なフライウェイトとして機能します。







インターフェースはおよそ次のようになります。







 interface ConnectivityService { fun sub(service: UUID): Observable<DataService> } interface DataService { fun sub(): Flowable<Event> fun write(data: ByteArray): Boolean fun dispose() } interface DataRepository { fun sub(serviceUUID: UUID): Flowable<Event> fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> fun dispose() }
      
      





そして、それに応じて、カットの下の実装







ConnectivityServiceImpl
 class ConnectivityServiceImpl(private val bluetooth: RxBluetooth, private val events: EventMatchersAdapter, private val timeoutSeconds: Long = 15L): ConnectivityService { override fun sub(service: UUID): Observable<DataService> = when(bluetooth.isBluetoothEnabled && bluetooth.isBluetoothAvailable) { false -> Observable.empty() else -> { ensureBluetoothNotDiscovering() bluetooth.startDiscovery() bluetooth.observeDevices() .filter { device -> device.uuids.contains(ParcelUuid(service)) } .timeout(timeoutSeconds, TimeUnit.SECONDS) .take(1) .doOnNext { _ -> ensureBluetoothNotDiscovering() } .doOnError { _ -> ensureBluetoothNotDiscovering() } .doOnComplete { -> ensureBluetoothNotDiscovering() } .flatMap { device -> bluetooth.observeConnectDevice(device, service) } .map { connection -> DataServiceImpl(BluetoothConnection(connection), events) } } } private fun ensureBluetoothNotDiscovering() { if(bluetooth.isDiscovering) { bluetooth.cancelDiscovery() } } }
      
      





DataServiceImpl
 class DataServiceImpl constructor(private val connection: BluetoothConnection, private val adapter: EventMatchersAdapter): DataService { override fun sub(): Flowable<Event> = Flowable.create<Event>({ emitter -> val underlying = EventsBridge(adapter = adapter, emitter = emitter) emitter.setDisposable(object: MainThreadDisposable() { override fun onDispose() { if(!underlying.isDisposed) { underlying.dispose() } } }) connection.observeByteStream().subscribe(underlying) }, BackpressureStrategy.BUFFER) override fun write(data: ByteArray): Boolean = connection.send(data) override fun dispose() = connection.closeConnection() }
      
      





DataRepositoryImpl
 class DataRepositoryImpl(private val connectivity: ConnectivityService): DataRepository { private val services = ConcurrentHashMap<UUID, DataService>() override fun sub(serviceUUID: UUID): Flowable<Event> = serviceOf(serviceUUID) .flatMap { service -> service.sub() } override fun write(serviceUUID: UUID, data: ByteArray): Flowable<Boolean> = serviceOf(serviceUUID) .map { service -> service.write(data) } override fun dispose() { for((_, service) in services) { service.dispose() } } private fun serviceOf(serviceUUID: UUID): Flowable<DataService> = with(services[serviceUUID]) { when(this) { null -> connectivity.sub(serviceUUID).doOnNext { service -> services.put(serviceUUID, service) }.toFlowable(BackpressureStrategy.BUFFER) else -> Flowable.just(this) } } }
      
      





そのため、最小限の行数で、通常は不気味な呼び出しの連鎖になったものを実行するか、ほぼ次のようにコールバックすることができます。







 repository.sub(UUID.randomUUID()) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe { event -> when(event) { is Event.A -> doSomeStuffA(event) is Event.B -> doSomeStuffB(event) is Event.C -> doSomeStuffC(event) is Event.D -> doSomeStuffD(event) } }
      
      





任意のデバイスからの4つのイベントをリッスンする11行、悪くないですか?)







結論の代わりに



読者の一人が情報源を見たいと思っているなら、彼らはここにいる







誰かが生のバイトからのパケットの形成に他のルールがどのように適合するかを見たい場合は、書き加えてください。







UPD:ReactiveXのオプションブリッジ、コルーチン、およびKotlinのクリーンな実装を使用して、 マイクロフレームワークで設計されています。








All Articles