Matrixmultiplikation i stor skala med pyspark (eller - hvordan man matcher to store datasæt med firmanavne)

Spark og pyspark har vidunderlig support til pålidelig distribution og parallelisering af programmer samt support til mange grundlæggende algebraiske operationer og maskinlæringsalgoritmer.

I dette indlæg beskriver vi motivationen og midlerne til at udføre matchning af navn ved navn ved to store datasæt med firmanavne ved hjælp af Spark.

Motivation først

Vores mål er at matche to store sæt firmanavne. Vi ser på to lange lister med firmanavne, liste A og liste B, og vi sigter mod at matche virksomheder fra A til virksomheder fra B.

Vi ville typisk have noget som dette:

Liste A | Liste B
---------------------
GOOGLE INC. | Google
MEDIUM.COM | Medium Inc
Amazon labs | Amazon
Google, inc |
Yahoo |
             | Microsoft

I dette eksempel er vores mål at matche både GOOGLE INC. Og Google inc (fra liste A) til Google (fra liste B); og til at matche MEDIUM.COM til Medium Inc; og Amazon-laboratorier til Amazon osv. ...

Ser man på dette enkle eksempel, skiller sig et par ting ud:

  • Det er muligt for mere end et firma fra A at matche et enkelt firma fra B; det er et mange-til-en-forhold.
  • Firmanavne er i de fleste tilfælde lette for mennesker at matche (f.eks. Amazon-labs til Amazon), men er ikke så lette for computere at matche (hvordan ved computeren, at "labs" i dette tilfælde er ubetydelige, og at "Amazon" bare er en forkortelse til “Amazon labs”?)
  • Ikke alle virksomheder fra A har kampe i B, og ikke alle virksomheder fra B matches fra A. I vores eksempel matches Yahoo fra liste A ikke til noget andet selskab på B, og Microsoft fra B matches heller ikke til noget selskab på A .
  • Ethvert element fra liste A skal højst have en kamp fra B. Det modsatte er ikke sandt, som anført, mange virksomheder fra A kunne matches til et enkelt firma på B.

Første forsøg - trivielt match

OK, først troede vi, at vi ville prøve den mest enkle og trivielle løsning for at se, hvor godt det fungerer, hvis ikke for noget andet, i det mindste for at etablere en basislinje for fremtidige forsøg. Den mest enkle ting at gøre er bare ikke-følsomme strenge ligningstest. Bare match strengene fra A til strengene fra B.

Præcision og husk

Der er to relevante foranstaltninger, man skal se på: Præcision og husk. Præcision er "hvor mange fejltagelser har vi (ikke) lavet", eller med andre ord - i betragtning af alle kampe, hvor mange af dem der faktisk var korrekte kampe - Præcise kampe. Så præcision handler om falske positiver.

Husk på den anden side er "hvor mange kampe der skulle have fundet, men blev forpasset". Så tilbagekaldelse handler om de falske negativer.

Det første trivielle forsøg var som forventet høj på præcision, men lavt på tilbagekaldelse. Hvis vi ser på en kort liste over eksempler på virksomheder, ville den matche nulelementer fra A til B. Det er meget dårlig tilbagekaldelse, men 100% præcision ;-). I den virkelige verden vil det naturligvis matche lidt mere end nul, men det er let at se, at huskningen på grund af mange små mulige variationer i firmanavne vil forblive lav.

En simpel forbedring til det ville være fjernelse af stop-ord.

Stop ord

Hvad er stop-ord? Stop-ord er ord, der fjernes før processioner af forskellige NLP-algoritmer, fordi de ikke tilføjer oplysninger, normalt tilføjer de bare støj. I almindelige engelske stop-ord er normalt “for”, “for” “hvis” osv. På sproget, dette er meget almindelige ord, der bliver brugt meget, men for mange NLP- og IR-algoritmer tilføjer de ikke oplysninger. De sørger for korrekte syntaktiske sætninger, og i mange tilfælde påvirker de semantikken, men på niveau med mange NLP-processorer, som ikke ser på den faktiske syntaks, er de meningsløse.

I vores tilfælde er stop-ordene ikke "hvis", "af" eller "for", som er typiske på engelsk, men de er "inc" og "llc" fra virksomhedsudvidelsen. Så vores enkle forbedring er blot at fjerne alle disse virksomhedsudvidelser og prøve den enkle strengekvation igen.

Dette hjalp faktisk, og som du kan se i vores eksempel, hjalp dette med at matche “Google inc.” Til “Google” og med enkel fjernelse af tegnsætning og korrekt tokenisering matcher vi også “Google, inc” til “Google”. Men det stemmer stadig ikke overens med "Amazon labs" og "Amazon" b / c "labs" er ikke et stop-ord i den forstand, at det ikke er en almindelig virksomhedsudvidelse. Som det viser sig, er "Amazon-labs" ikke bare et tilfældigt eksempel, mange firmanavne har disse variationer i deres navne, der er manifesteret i et datasæt, men ikke manifesteret i andre datasæt. Konklusion: Vi er nødt til at finde en måde at “se ud over det”, ignorere “labs” i “Amazon labs”.

Lad os møde videnskab.

Videnskaben

Det, vi ser på her, er problemet med at matche N-dokumenter fra liste A til M-dokumenter på liste B, i mange-til-en-forhold. Men vores matchende algoritme skal være "smart" i den forstand, at den skal være i stand til at skelne mellem "vigtige ord" og "ikke vigtige ord". Vi er nødt til at finde en måde at fortælle computeren om, at “labs” i “amazon labs” er ubetydelig, men “amazon” er virkelig betydningsfuld. Vi ville også trivielt markere navnene i mindre tokens ved at opdele efter mellemrum, tegnsætning osv., Så “medium.com” ville blive opdelt i “medium” og “com”.

Videnskab til redning!

TF-IDF

Til dette formål bruger vi et fælles skema i informationsindhentningsteori kaldet TF-IDF. TF-IDF står for Term Frequency - Inverteret dokumentfrekvens. Termfrekvens betyder simpelthen "hvor mange gange dette ord vises i dette dokument" (vores dokumenter er bare firmanavne, så de er meget korte "dokumenter"). Så i tilfælde af "amazon labs" har vi kun to ord i dokumentet "amazon" og "labs", og deres frekvens er simpelthen 1 og 1. (Hvis firmaets navn bare tilfældigvis var "amazon labs" amazon ”, så ville antallet have været 2 for“ amazon ”og 1 for“ labs ”.) Det er, hvad TF handler om, ganske enkelt: Tæl frekvensen af ​​termer i dokumentet.

Omvendt dokumentfrekvens er den rigtige aftale. Den omvendte dokumentfrekvens ser på alle "dokumenter" (også korpus, alle firmanavne) og tester, hvor ofte ordet "laboratorier" vises i dem alle. Hvis ordet “amazon” kun vises i et enkelt dokument, betyder det, at “amazon” er et markant ord, men hvis ordet “labs” vises i mange andre dokumenter (f.eks. Bruger mange virksomheder ordet “labs” som en del af deres navn) betyder det, at ordet “labs” er ubetydeligt. IDF er netop det - hvor mange dokumenter vises ordet i.

TF-IDF er TF for udtrykket divideret med udtrykket IDF. Det giver en god måling af, hvor vigtige eller hvor betydningsfulde ord er i sammenhæng med specifikke dokumenter.

Det er let at beregne TF-IDF-matrix til et sæt dokumenter. Der er færdige biblioteker, der gør det, og vi brugte scikit-lear's implementering til det.

TF-IDF-matrixen er en todimensionel matrix, hvor rækkerne repræsenterer dokumenter (i vores tilfælde - firmanavne), og kolonnerne repræsenterer unikke symboler (eller ord). Hvis vi ønskede at opbygge TF-IDF-matrixen i vores lille korpus fra liste A ville det se sådan ud (efter fjernelse af stop-ord, tegnsætning og nedskæring af alt):

           | google | medium | com | yahoo | amazon | laboratorier
-------------------------------------------------- ---------
GOOGLE INC. | 1 0 0 0 0 0 0
MEDIUM.COM | 0 .77 .63 0 0 0
Amazon labs | 0 0 0 0 .7 .7
Google, inc | 1 0 0 0 0 0 0
Yahoo | 0 0 0 1 0 0
com | 0 0 1 0 0 0

Her er koden:

fra sklearn.feature_extraction.text import TfidfVectorizer
matrix = vectorizer.fit_transform (['GOOGLE', 'MEDIUM.COM', 'Amazon labs', 'Google', 'Yahoo', 'com'])

Matrixen, der blev oprettet, er NxM, hvor N = antal virksomheder og M = antal unikke tokens.

Du vil bemærke, at vi tilføjede et andet (sammensat) firma med navnet "com". Vi gjorde det for at demonstrere en vigtig egenskab ved TF-IDF. Vi bruger TF-IDF for at skelne mellem markante og ubetydelige symboler i dokumenterne. Et markant token i et dokument er et token, der ikke kun vises i dokumentet ofte, men som også er relativt sjældent i hele korpuset. Hvis et udtryk vises mange gange i korpuset, bliver det mindre vigtigt for dette specifikke dokument. Vi tilføjede det sammensatte firma "com", så "Medium" inden for "Medium.com" bliver mere betydningsfuldt. (Du vil bemærke, at “Medium” -vægt er 0,77, mens “com” -vægt er 0,63, og det skyldes udseendet af “com” i et andet dokument, hvorfor IDF er lavere).

I den virkelige situation ville du selvfølgelig have snesevis eller hundreder af firmanavne med tokenet "com" eller "labs", så du vil se en væsentlig forskel mellem "Medium" og "com" i navnet Medium.com.

Kosinelighed

Det næste trin efter beregning af TF-IDF-matrixen for begge sider (begge lister A og B for virksomheder) er at multiplicere matrixerne.

At multiplicere matrixer giver en interessant foranstaltning kaldet Cosine-ligheden. Kosinus-ligheden er en simpel lighedsmåling, der spænder mellem 0 og 1. En værdi på 1 indikerer identiske elementer, og en hastighed på 0 indikerer helt forskellige elementer (ligesom den kosinus-trig-funktion gør). Multiplikation af matrixerne giver kosinus-ligheden mellem hvert element på liste A til hvert element på liste B. Faktisk multiplicerer vi A med B.T (B.transpose), så dimensionerne passer. Det interessante ved kosinus-lighed mellem TF-IDF-matrixer er, at resultatet er en matrix af ligheder mellem hvert element i A til hvert element i B, mens man tager højde for betydningen af ​​symboler i navnene. Normalt betyder et resultat af> .8 et gyldigt match.

Heldigvis giver python-pakken sklearn en simpel cosinus_similaritetsfunktion, der accepterer to matrixer og resulterer i kosinus-ligheden mellem disse to. Her er nogle demokoder:

fra sklearn.feature_extraction.text import TfidfVectorizer
fra sklearn.metrics.pairwise import cosine_similarity
a = vectorizer.fit_transform (['aa', 'bb', 'aa bb', 'aa aa bb'])
b = vectorizer.fit_transform (['aa', 'bb'])
kosimilariteter = cosine_similarity (a, b)

Resultatet er en matrix af ligheder mellem hvert element i A til hvert element i b.

           aa | bb
----------------------
aa | 1 | 0
bb | 0 | 1
aa bb | .7 | 0,7
aa aa bb | .89 | .44

Som forventet er ordet "aa" fra a meget lig ordet "aa" fra b (bemærk 1). Du kan også se, at “aa bb” ligner både “aa” og “bb”, og det giver også mening. Og til sidst vil du bemærke, at "aa aa bb" er større lighed med "aa" end det gør med "bb". Alt dette giver mening.

Lad os opsummere videnskaben her. Først tager vi to lister med dokumenter, og for hvert sæt beregner vi dets TF-IDF-matrix *. Derefter multiplicerer vi de to matrixer for at finde ud af deres kosinus-lighed, som er en matrix, der beskriver ligheden mellem hvert dokument i A til hvert dokument i B.

Anden forsøg - numpy matrixmultiplikation

Efter at have set videnskaben, vil vi prøve dette i praksis. Det næste forsøg er at indlæse alle virkelige virksomheder fra liste A og indlæse alle virkelige virksomheder fra liste B og multiplicere pokkerne ud af dem ved hjælp af funktionen cosine_similatiry.

Nemmere sagt end gjort.

I lille skala fungerer dette ganske enkelt, og det fungerer meget pænt. For eksempel med et par tusinde virksomheder på hver side, der ville fungere. Men med vores datasæt, hvor vi har et par hundrede tusinder navne på hver liste, op til et par millioner, bliver dette udfordrende.

Det er nemt at beregne TF-IDF, selv med så store datasæt, på en enkelt vært (min bærbare computer kører så let på få sekunder). Men at multiplicere matrixerne er den virkelige udfordring.

Lokal multiplikation skaleres ikke

Lad os antage, at vi har 1M (10⁶) navne på hver liste. Hver matrix ville derefter dreje sig om størrelsesordenen 10⁶ x 10⁶ (da antallet af unikke tokens svarer til antallet af virksomheder på grund af unikke navne). Oprettelse af en 10⁶ x 10⁶ matrix i hukommelsen betyder, at 10¹² flyder. En python-float tager 16 bytes, så vi ender med 16 * 10¹² bytes, som oversættes til ~ 4PT (fire petabytes) RAM. Kan vi opbevare en 4PT matrix i hukommelsen? Nå, selvfølgelig ikke, i det mindste ikke på min lurvede laptop. Men - der er et trick. Vi behøver ikke at holde det hele i hukommelsen. Husk, at selv om matrixen er 1M på 1M, er den i praksis for det meste fyldt med nul. Så hvorfor skulle vi gidde at bevare alle disse nuller i hukommelsen? Vi kan i stedet bruge en sparsom repræsentation af matrixen, hvor vi i stedet for at holde den todimensionelle matrix i hukommelsen ved hjælp af arrays af matriser, vi i stedet kun kan holde styr på koordinaterne for ikke-nul-elementerne og antage, at alle de andre bare er nuller. Det er fantastisk til at bevare fodaftryk med lav hukommelse såvel som til at udføre hurtige matrixmultiplikationsoperationer. Og faktisk er det præcis, hvad sklearn allerede gør. At beholde matrixerne som Sparse Matrices betyder, at vi kun skal tildele omkring 1M flydende (værdier) plus 2M heltal (indeks for ikke-nul elementer), men alt i alt handler det om 24Mbyte, hvilket er ganske let.

Men - at multiplicere de to matrixer, selv om de er sparsomme, ville betyde mindst 10¹² operationer (hvis vi er smarte med nulerne). Nu er det lidt sværere. Og selvom numpy (som ligger under sklearn) er meget god til sådan hurtig matematik, er denne ting lidt udfordrende, selv for numpy.

Det prøvede vi - simpelthen at multiplicere disse to matrixer. Det fungerer godt til lille nok matrixstørrelse, men ved nogle tal (som er meget mindre end hvad vi ønsker) begyndte det at mislykkes og løbe tør for hukommelse. Nu kunne vi have udarbejdet dette ved at opdele en af ​​matrixerne til mindre bidder og køre et tal eller multiplikationer efter hinanden og derefter opsummere alle ting. Men denne type mindede os om, at vi allerede kender den slags system, der gør det, det kaldes Spark.

Tredje forsøg - Multiplikation af gnistmatrix

Spark er fantastisk til stærkt paralleliserede hukommelseskrævende beregninger, og se og se, den har en BlockMatrix-datatype, der implementerer en multiplikationsoperation. Ligner nøjagtigt hvad vi ledte efter! OK, så vi opretter TF-IDF-matrixer og konverterer dem til Sparks BlockMatrix og kører a.multiply (b.transpose ()), hvilket er mere eller mindre, hvad cosine_similatiry gør.

# Pseudokode ...
a_mat = tfidf_vect.fit_transform ([..., ..., ...])
b_mat = tfidf_vect.fit_transform ([..., ..., ...])
a_block_mat = create_block_matrix (a)
b_block_mat_tr = create_block_matrix (b.transpose ())
cosimilarities = a_block_mat.multiply (b_block_mat_tr)

Dette synes let nok, og det er det virkelig. Men der er selvfølgelig et "men" ... denne ting, selvom den er enkel og fungerer korrekt fra matematisk synspunkt - ja, den skalerer ikke ... Vi er i stand til at multiplicere store matrixer, men ikke så store, som vi gerne vil . Vi prøvede at lege med blokstørrelser osv. Desværre, for store nok indgange mislykkes det med enten hukommelsesfejl eller bare lange løb, der aldrig slutter (timer og timer).

Hvad er problemet? Kan ikke gnist skala?

Gnist kan selvfølgelig skaleres. Men du skal bruge det klogt, dumt ... Problemet med BlockMatrix er, at Spark konverterer matrixens sparse blokke til tætte (sub) matrixer for at implementere multiplikationsoperationen. Og selvom det meste af vores matrix er nuller, ville gnist stadig konvertere alle disse nuller til tæt repræsentation, som enten ville forbruge for meget hukommelse, eller hvis vi holder størrelsen på blokke små, ville det resultere i for mange operationer, repartitioner osv. Og køre for evigt.

Gnist understøtter sparse matrixer, men disse matrixer implementerer ikke multiplikation (aka dot) -operationen, og den eneste distribuerede matrix, der implementerer multiplikationsoperationen fra skrivningstidspunktet, er BlockMatrix, som som nævnt omdanner den sparsomme repræsentation til tæt repræsentation, før du multiplicerer dem. Vi skal bemærke, at der havde været diskussioner i gnistfællesskabet om måder at implementere distribueret spredt matrixmultiplikation, dog som nævnt - på dette tidspunkt blev dette ikke implementeret endnu.

BlockMatrix.multiply () mislykkedes. Hvad er det næste?

Fjerde forsøg - og vinderen er ...

Vores fjerde og sidste forsøg var vellykket. Ideen er at blande og matche Spark med følelsesløs. Vores test viser, at numpy er i stand til at multiplicere en mindre matrix med en større matrix, så hvis vi kun tager et lille stykke matrix A og multiplicerer det med matrix B, ville det fungere og numpy ville ikke eksplodere. Og hvis du husker dine algebra-lektioner, kan multiplikation af matrixer udføres vektor for-vektor så algebraisk-vis, at det stadig ville være korrekt. Tanken er at opdele kun en af ​​matrixerne i mindre stykker og derefter lade hver gnistarbejder køre multiplikationen på sin del og derefter returnere netop konklusionen, f.eks. konklusionen kan være, at navnet ved A [13] stemmer overens med navnet på B [21] osv.

Broadcast og Parallelize til redning

Spark har to nyttige muligheder: kringkast og paralleliseres. Broadcast udsender ganske enkelt de nøjagtige samme data til alle arbejdstagere. Vi bruger transmission til at sende matrix B til alle arbejdstagere, så alle arbejdere har den komplette B-matrix. Paralleliser bunker dataene i partitioner og sender hver partition til en anden arbejdstager. Vi bruger parallelize til at sende bunker af A til arbejderne, så hver arbejdstager har alle B, men bare en lille del af A.

Her er den generelle oversigt:

  1. Beregn TF-IDF-matrixer på driveren.
  2. Paralleliser matrix A; Broadcast matrix B
  3. Hver arbejdstager overlapper nu sin arbejdsdel ved at multiplicere sin del af matrix A med hele matrix B. Så hvis en arbejder arbejder på A [0:99], ville den multiplicere disse hundrede rækker og returnere resultatet af, siger A [13] ] matcher et navn, der findes i B [21]. Multiplikation udføres ved hjælp af numpy.
  4. Driveren ville samle alle resultaterne fra de forskellige arbejdstagere tilbage og matche indekserne (A [13] og B [21]) til de faktiske navne i det originale datasæt - og vi er færdige!

Denne metode fungerer meget godt, og når det kørte for første gang, var det så en dejlig overraskelse, at vi troede, det bare ikke virkede (men det gjorde…). Sammenlignet med de foregående metoder, der enten løb i timevis (og ikke afsluttedes) eller løbet tør for eller hukommelse, var denne metode i stand til at fuldføre sin beregning i rækkefølge på kun et par minutter. Naturligvis afhænger det af størrelsen på dataene og størrelsen på Sparks klynge, men alt i alt fungerede det rigtig godt.

Lige nu er den eneste flaskehals driveren, der beregner TF-IDF-matrixer, og på den front har vi stadig tonsvis af albuerum, fordi denne beregning stadig er ret let for sklearn. (side-note: Spark implementerer også distribueret TF-IDF-beregning, men vi behøvede ikke at bruge den).

Her er pseudokode for at illustrere vores løsning:

fra sklearn.feature_extraction.text import TfidfVectorizer
fra sklearn.feature_extraction.text import CountVectorizer
fra sklearn.metrics.pairwise import cosine_similarity
# disse vil realistisk blive læst fra filer eller dataframes.
a = ['google inc', 'medium.com', ...]
b = ['google', 'microsoft', ...]
stopwords = ['ltd', ...]
vect = CountVectorizer (stop_words = stopwords)
# dette kan gøres med mindre hukommelsesomkostninger ved hjælp af en generator
ordforråd = vect.fit (a + b). ordforråd_
tfidf_vect = TfidfVectorizer (stop_words = stopwords,
                             ordforråd = ordforråd)
a_mat = tfidf_vect.fit_transform (a)
b_mat = tfidf_vect.fit_transform (b)
a_mat_para = parallelize_matrix (a_mat, rows_per_chunk = 100)
b_mat_dist = broadcast_matrix (a_mat)
a_mat_para.flatMap (
        lambda submatrix:
        find_matches_in_submatrix (csr_matrix (delmatrix [1],
                                             form = delmatrix [2]),
                                   b_mat_dist,
                                   delmatrix [0]))
def find_matches_in_submatrix (kilder, mål, input_start_index,
                              tærskel = 0,8):
    cosimilarities = cosine_similarity (kilder, mål)
    for i, kosimilaritet i enumerate (kosimilariteter):
        cosimilarity = cosimilarity.flatten ()
        # Find det bedste match ved hjælp af argsort () [- 1]
        target_index = cosimilarity.argsort () [- 1]
        source_index = input_start_index + i
        lighed = cosimilarity [target_index]
        hvis kosimilaritet [target_index]> = tærskel:
            udbytte (kildeindex, målindeks, lighed)
def broadcast_matrix (mat):
    bcast = sc.broadcast ((mat.data, mat.indices, mat.indptr))
    (data, indekser, indptr) = bcast.value
    bcast_mat = csr_matrix ((data, indekser, indptr), form = mat.shape)
    returner bcast_mat
def parallelize_matrix (scipy_mat, rows_per_chunk = 100):
    [rækker, cols] = scipy_mat.shape
    i = 0
    submatrices = []
    mens jeg 

Du vil bemærke, at vi efter udsendelsen og paralleliserer, vi samler matrixen til scipy csr_matrix, hvilket er, hvad den stammer fra. Så hvad vi dybest set gør, er - vi serialiserer matrixerne over ledningen og samler dem derefter tilbage på den anden side, på arbejderne. Serialiseringen er effektiv, da vi kun har brug for at sende de ikke-nul elementer i den sparse matrix. Så for en matrix med 1M-elementer sender vi kun ca. 1M flydere sammen med 2M ints, som bestemt er i Sparks komfortzone.

Konklusion

Vi beskriver en metode til at finde lighed mellem to lister over strengene A og B, der beskriver firmanavne. Vi brugte TF-IDF og kosinus-lighed som en lighed faktor.

Dernæst viser vi forskellige forsøg på skalerbar implementering af matrixmultiplikation ved hjælp af gnist og den vindende metode, der kombinerer numpy matrixmultiplikation sammen med gnistens transmission og paralleliseringsfunktioner.

* Et fint punkt at nævne: ordforrådet for begge matrixer skal være det samme. Med andre ord skal antallet af rækker i begge matrixer være lige, og de skal have nøjagtigt den samme rækkefølge, f.eks. hver række repræsenterer et udtryk, og rækkefølgen af ​​rækkerne skal være nøjagtig den samme mellem matrix A og matrix B. Dette gøres let ved først at beregne ordforrådet og først derefter beregne TF-IDF som i følgende eksempel:

fra sklearn.feature_extraction.text import TfidfVectorizer
fra sklearn.feature_extraction.text import CountVectorizer
fra sklearn.metrics.pairwise import cosine_similarity
a = ['google inc', 'medium.com']
b = ['google', 'microsoft']
company_name_stopwords = frozenset (['ltd', 'llc', 'inc'])
vect = CountVectorizer (stop_words = company_name_stopwords)
ordforråd = vect.fit (a + b). ordforråd_
tfidf_vect = TfidfVectorizer (stop_words = firma_navn_stopwords,
                             ordforråd = ordforråd)
a_mat = tfidf_vect.fit_transform (a)
b_mat = tfidf_vect.fit_transform (b)
cosimilarities = cosine_similarity (a_mat, b_mat)