Sådan rxJava2: Series Intro og PublishProcessor

Jeg har tilbragt de sidste par dage sammenklemt i en ydmyg masse af tæpper i en computerstol foran min pc, gentagne gange med ansigt-plantning min vej gennem RxJava2 Javadoc og forsøgt at få en form for forståelse af det. Måske er jeg lidt svag, eller (forhåbentlig og tilsyneladende mere sandsynlig) RxJava2 er simpelthen et forvirrende bibliotek. Jeg har sprunget fra Reactive Streams-dokumentation til Rx.NET-designretningslinjerne for 2010 og gennem næsten hver side på reactivex.io, og stadig var det først, før jeg startede en kedelig prøve-og-fejl-proces, som jeg virkelig begyndte at få en forståelse af, hvordan dette bibliotek fungerer. Denne artikel er den første af mange, der dokumenterer min akavede snubler gennem RxJava2's dybder.

Hovedideen med denne serie er at gøre disse koncepter markant mere tilgængelige for andre, der er nye for RxJava 2. Måske har jeg uflaks, eller måske er dette bare ikke en "ting", men ingen af ​​de oplysninger, jeg kunne finde, syntes direkte nok. Så jeg agter at præsentere (efterhånden) alle koncepterne i RxJava2, når jeg lærer dem på den mest direkte og "instruktionsmæssige" måde, jeg kan.

Grundlæggende om PublisherProcessor

En PublishProcessor udvider klassen FlowableProcessor. Som udvidelse betyder det, at en PublisherProcessor fungerer både som udgiver og abonnent på samme tid. På grund af dette dobbeltformål er multicasting aktiveret. En PublishProcessor udsender varer til sine abonnenter, hvilket betyder, at alle abonnenter på denne PublishProcessor vil modtage alle emitterede emner, da de udsendes fra kilden [PublishProcessor].

En PublishProcessor er ideel, når du har brug for et forhold mellem kilde og forbruger, hvor der er en kilde og mange forbrugere af samme kilde.

instantiering

PublishProcessor  processor = PublishProcessor.create ();

Dette, i al sin enkelhed, er virkelig, hvordan du instantierer en PublishProcessor. I eksemplet ovenfor opretter syntaksen en PublishProcessor, der vil udsende emner af typen Objekt til dens abonnenter, skønt du kan gøre dette, hvad du har brug for, det skal være.

Hvem kan abonnere på PublishProcessors?

Den Reaktive Extensions-abonnent , det er der! Her er den mest basale, fungerende abonnentimplementering:

import org.reactivestreams.Subscribber;
import org.reactivestreams.Subscription;

offentlig klasse BaseSubscribber implementerer abonnent  {

    privat abonnementsabonnement;

    @Override
    public void onSubscribe (abonnementsabonnement) {
        System.out.println ("Ny abonnent");
        this.subscription = abonnement;
        subscription.request (1);
    }

    @Override
    public void onNext (Objekt o) {
        System.out.println ("Got:" + o);
        subscription.request (1);
    }

    @Override
    offentligt ugyldigt onError (kaste, der kan kastes) {
        throwable.printStackTrace ();
    }

    @Override
    public void onComplete () {
        System.out.println ( "Complete.");
    }
}

Der er få vigtige ting at bemærke i koden ovenfor. Først i onSubscribe-metoden skal du være sikker på at gemme en henvisning til abonnementsobjektet. Dette er, så du kan bruge abonnement.anmodning (1) i onNext-metoden. DETTE ER VIRKELIG VIGTIGT - sørg for at ringe til anmodning om abonnementet i onSubscribe-metoden - dette sikrer, at dataene flyder ordentligt, når onNext kaldes. Det er på samme måde som en rutschebane - onSubscribe svarer til at gå om bord på rutsjebanen og køre op den langsomme første stigning, men stoppe før det første fald. I denne analogi vil det første "drop" ikke ske, før det første onNext opkald foretages. Derefter kan dataene flyde frit, eller indtil onComplete / onError opstår.

I onNext-metoden er kroppen, hvor "magien" sker. Det er her du skal give forretningslogikken for alle dine behov. Det eneste krav er, at du ringer til abonnement.anmodning (lang) inden for det - dette sikrer, at dataene kan fortsætte med at flyde ordentligt.

Implementeringen af ​​onError-metoden ovenfor fortæller simpelthen, hvor du pooped i koden, og onComplete sender bare en grundlæggende konsolmeddelelse for at fortælle dig den (abonnementet med PublishProcessor i dette tilfælde) afsluttet med succes.

At sætte det sammen

Dette eksempel bruger BaseSubscribber-eksemplet ovenfra.

public static void main (String [] args) {

    PublishProcessor  processor = PublishProcessor.create ();
    BaseSubscriber-abonnentA = ny BaseSubscriber ();
    processor.subscribeActual (abonnentA);
    processor.onNext ("Nogle tekstlinjer");
    processor.onNext ("En anden tekstlinie");
    processor.onNext ("Endnu en tekstlinje");
    
    BaseSubscriber-abonnentB = ny BaseSubscriber ();
    processor.subscribeActual (abonnentB);
    processor.onNext ("Nogle andre tekstlinjer");
    processor.onNext ("Endelig tekstlinje");
    processor.onComplete ();

}

Alle opkald til onNext udsendes teknisk til "alle" abonnenter, men for de tre første opkald er der kun en abonnent, så output skal kun vises én gang. På det fjerde og femte opkald til onNext har vi tilføjet abonnentB, hvilket betyder, at vi skal se de to opkald gentages to gange i output. Og endelig sendes opkaldet til onComplete også til alle abonnenter, så fuldførelsesmeddelelsen skal også vises to gange (én gang for hver abonnent, hvis onCompleted blev kaldt).

Produktion

Ny abonnent
Fik: En vis tekstlinje
Fik: En anden tekstlinie
Fik: Endnu en tekstlinje
Ny abonnent
Fik: En anden tekstlinje
Fik: En anden tekstlinje
Fik: Sidste tekstlinje
Fik: Sidste tekstlinje
Komplet.
Komplet.

Som forventet resulterede kun de første tre opkald til onNext i, at der vises en enkelt tekstlinje, da der kun er en abonnent. På den femte linje i output ser vi meddelelsen "Ny abonnent" for at indikere, at der nu er to abonnenter. Og som forventet duplikeres de endelige produktionslinjer for antallet af abonnenter, det i øjeblikket har.

Konklusion

Dette er min første af mange artikler om RxJava2 - Jeg ved, det synes underligt at starte med noget så specifikt som PublishProcessor, men jeg har eksterne projekter, jeg arbejder med, med behov, som denne klasse kan håndtere meget godt. Afgørelsen var rent nødvendigt fra min side. Fremover vil jeg fokusere på mere basale koncepter og opbygge derfra. Håber, at dette var til en vis brug for nogen!