Sådan bruges API'er med Pandas og gemmer resultaterne i Redshift

Her er en nem tutorial til at hjælpe med at forstå, hvordan du kan bruge Pandas til at hente data fra et RESTFUL API og gemme i en database i AWS Redshift.

Nogle grundlæggende forståelser af Python (med forespørgsler, Pandas og JSON-biblioteker), REST-API'er, Jupyter Notebook, AWS S3 og Redshift ville være nyttige.

Målet med selvstudiet er at bruge de geografiske koordinater (længdegrad og breddegrad), der leveres i en CSV-fil til at kalde et eksternt API og vende geokode til koordinaterne (dvs. få placeringsoplysninger) og til sidst gemme svardataene i en Redshift-tabel. Bemærk, at du muligvis kræver cache-godkendelse fra API-dataleverandøren.

Vi indlæser CSV med Pandas, bruger Requests-biblioteket til at ringe til API, gemme svaret i en Pandas-serie og derefter en CSV, uploade det til en S3 Bucket og kopiere de endelige data til en Redshift-tabel. Trinene nævnt ovenfor er på ingen måde den eneste måde at nærme sig dette og opgaven kan udføres på mange forskellige måder.

Datasættet, vi vil bruge, indeholder landets befolkning som målt af Verdensbanken i 2013 og kan findes på nedenstående websted.

Lad os begynde.

Trin 1 - Download datasættet

Links til kolonneforklaring og datasæt

https://developer.here.com/documentation/geovisualization/topics/sample-datasets.html

http://js.cit.datalens.api.here.com/datasets/starter_pack/Global_country_populations_2013.csv

Den første ting er at downloade CSV-filen fra ovennævnte websted.

Trin 2 - Start Jupyter Notebook, og indlæs datasættet i hukommelsen med Python

Installer Jupyter Notebook (med Anaconda eller andet), og affyr den ved at indtaste følgende kommandoer i terminalen, mens du er i det bibliotek, du vil gemme den bærbare computer.

Start Jupyter Notebook

Forudsat at du har installeret de påkrævede biblioteker, lad os indlæse CSV-filen i hukommelsen.

importer pandaer som pd
importanmodninger
import json
importtid
fra pandas.io.json import json_normalize
 
df = pd.read_csv ('Global_country_populations_2013.csv')
df = df [['CountryName', 'lat', 'lon']]
df.head ()
Indlæs data i Pandas DataFrame

Vi har også importeret andre biblioteker, da vi har brug for dem senere. Metoden 'read_csv ()' læser din CSV-fil i en Pandas DataFrame. Bemærk, at du skal specificere stien til fil her, hvis den ikke er gemt i samme bibliotek. Vi trunkerer derefter DataFrame for kun at beholde de kolonner, vi har brug for hovedsageligt ["CountryName", "lat", "lon"].

Trin 3 - Definer funktion til at kalde API

Den API, vi vil bruge til den omvendte geokodning, er LocationIQ (fra Unwired Labs), der tilbyder gratis ikke-kommerciel brug med en hastighed på 60 opkald / min og 10.000 opkald pr. Dag.

def get_reverse_geocode_data (række):
    prøve:
        YOUR_API_KEY = 'xxxxxxxxxxx'
        url = 'https://eu1.locationiq.org/v1/reverse.php?key=' + YOUR_API_KEY + '& lat =' + str (række ['lat']) + '& lon =' + str (række [' lon ']) +' & format = json '
        
        response = (request.get (url) .text)
        response_json = json.loads (svar)
        time.sleep (0,5)
        returner svar_json
    
    undtagen undtagelse som e:
        hæve e

I ovenstående kode har vi defineret en funktion - get_reverse_geocode_data (række). Når du har tilmeldt dig, får du en API-nøgle, som du skal medtage her sammen med endepunktet eller URL og de nødvendige parametre, som du kan få fra dokumentationen. Parameteren 'række' henviser til hver række kolonner 'lat' og 'lon' i Pandas DataFrame, der vil blive sendt som input til API'et. Anmodningsbiblioteket bruges til at fremsætte en HTTPS GET-anmodning til url'en og modtage svaret ved hjælp af '.text' metoden.

Du kan bruge 'json.loads ()' til at konvertere svaret fra en JSON-streng til et JSON-objekt, som er let at arbejde med. Parameteren 'time.sleep (0.5)' bruges til at kontrollere opkaldene til API på grund af begrænsninger, der er indstillet af den gratis niveauplan (60 opkald pr. Minut). For kommercielle højvolumenplaner er dette ikke nødvendigt.

Trin 4 - Ring til funktionen ved hjælp af DataFrame-kolonnerne som parametre

df ['API_response'] = df.apply (get_reverse_geocode_data, axis = 1)
df [ 'API_response']. hoved ()
Kald funktionen med DataFrame-kolonner som input

Du kan bruge metoden ‘df.apply ()’ til at anvende funktionen på hele DataFrame. Parameteren 'axis = 1' bruges til at anvende funktionen på tværs af kolonnerne. Parameteren 'række', vi brugte tidligere, gør det muligt for os at henvise til en hvilken som helst kolonne i DataFrame, som vi ønsker at bruge som input, og tager kolonneværdien på den række som input til den bestemte udførelse.

Trin 5 - Normaliser eller flat JSON-responsen

Nu hvor du har modtaget JSON-svaret fra API, er det tid til at flade det i kolonner og vælge de felter, du ønsker at beholde.

new_df = json_normalize (df ['API_response'])
new_df = new_df [['lat', 'lon', 'display_name']]
new_df

Funktionen 'json_normalize ()' er fantastisk til dette. Du kan videregive den Pandas-serie, du ønsker at normalisere som argument, og den returnerer en ny Pandas DataFrame med kolonnerne fladt ud. Du kan derefter slutte denne nye DataFrame til din gamle ved hjælp af Foreign Key, eller i dette tilfælde bruger vi kun den nye DataFrame. Lad os kun beholde feltet 'display_navn' (fra API_responset) sammen med 'lat' og 'lon'. Nedenfor er den nye DataFrame

Normaliser JSON for at få ny DataFrame

Lad os også tilføje en unik identifikator for hver række

importere uuid
new_df ['id'] = pd.Series ([uuid.uuid1 () for i inden for rækkevidde (len (new_df))])
new_df
Tilføjelse af unik identifikator

Trin 6 - Generer CSV-fil og upload til S3 Bucket

Følgende kode opretter en CSV-fil fra Pandas DataFrame til det specificerede bibliotek.

new_df.to_csv (path_or_buf = filenavn, indeks = falsk)

Metoden 'til_csv' fra Pandas opretter automatisk en indekskolonne, så vi kan undgå det ved at indstille 'indeks = usande'. CSV er nu oprettet, og vi kan uploade den til S3.

I denne tutorial vil vi bruge TinyS3 som er et meget let at bruge bibliotek til at arbejde med S3. Du kan også bruge Boto3, hvis du ønsker det.

import tinys3
import os
access_key = 'xxxxxxxxx'
secret_key = 'xxxxxxxxx'
endpoint = 'xxxxxxxx'
Bucket_name = 'xxxxxxxx'
conn = tinys3.Connection (access_key, secret_key, tls = False, endpoint)
f = åben (filnavn, 'rb')
conn.upload (filnavn, f, Bucket_name)
f.close ()
os.remove (filnavn)

Dokumentationen er meget selvforklarende og siger dybest set at tilføje din AWS adgangsnøgle, hemmelig adgangsnøgle og skovlnavn. Du kan derefter oprette en forbindelse til S3 og uploade den relevante fil. Vi sletter derefter filen fra drevet ved hjælp af 'os.remove (file_name)'.

Trin 7 - Opret redshift-tabel og kopier data ind i den

Du skal oprette en Redshift-tabel, før du kan kopiere data til den. Dette kan gøres med standard SQL-kommandoer til PostgreSQL-databaser, der udføres ved hjælp af Psycopg2, som er et PostgreSQL-bibliotek for Python.

Opret tabel

import psycopg2
my_db = 'xxxxxxx'
my_host = 'xxxxxxx'
my_port = 'xxxx'
my_user = 'xxxxxxxx'
my_password = 'xxxxxxx'
con = psycopg2.connect (dbname = my_db, host = my_host, port = my_port, bruger = my_user, password = my_password)
cur = con.cursor ()
sql_query = "CREATE TABLE reverse_geocode_location (lat varchar (255), lon varchar (255), display_name varchar (255), id varchar (255), PRIMARY KEY (id));"
cur.execute (sql_query)
con.commit ()
cur.close ()
con.close ()

Ved hjælp af Psycopg2 er det meget let at udføre SQL-kommandoer i Redshift eller en hvilken som helst anden PostgreSQL-motordatabase via Python. Vi skal først oprette en forbindelse, derefter en markør og til sidst udføre vores SQL-forespørgsel. Glem ikke at lukke forbindelsen til databasen, når SQL-forespørgslen er udført.

Kopier data fra S3 til Redshift

Vi er nu klar til det sidste og sidste trin i vores Tutorial - Copy CSV-fil fra S3 til Redshift. Årsagen til at vi bruger COPY i stedet for at bruge SQL Alchemy eller andre SQL-klienter, fordi Redshift er optimeret til søjleopbevaring, og denne metode er virkelig hurtig at indlæse data i den i stedet for at indlæse dataene række for række. Vi kan bruge Psycopg2 igen til dette.

Det er meget vigtigt at bemærke, at kolonnedatypene mellem CSV-filen og Redshift-tabellen skal være den samme og i samme rækkefølge, eller COPY-kommandoen mislykkes. Du kan kontrollere eventuelle LOAD-fejl ved at læse fra STL_LOAD_ERRORS-tabellen.

import psycopg2
my_db = 'xxxxxxx'
my_host = 'xxxxxxx'
my_port = 'xxxx'
my_user = 'xxxxxxxx'
my_password = 'xxxxxxx'
con = psycopg2.connect (dbname = my_db, host = my_host, port = my_port, bruger = my_user, password = my_password)
cur = con.cursor ()
sql_query = "" kopiér reverse_geocode_location fra 's3: // YOUR_BUCKET_NAME / YOUR_FILE_NAME' legitimationsoplysninger 'aws_access_key_id = YOUR_ACCESS_KEY; aws_secret_access_key = YOUR_SECRET_ACCESS_KEY' csvNULL '
cur.execute (sql_query)
con.commit ()
cur.close ()
con.close ()

I COPY-kommandoen ovenfor skal vi specificere spandens navn, filnavn, sikkerhedstaster og et par flag. En forklaring af de anvendte flag findes her:

Bemærk: Du skal give korrekte IAM-rolle tilladelser for at kopiere data fra S3 til Redshift.

Trin 8 - Læs data fra din tabel for at bekræfte

Når du har fulgt ovenstående trin, skal du nu kopiere dataene til din Redshift-tabel. Du kan bekræfte det ved at læse dataene ved hjælp af Psycopg2 i Python eller en hvilken som helst anden SQL-klient.

vælg * fra reverse_geocode_location

Konklusion og næste trin

Denne tutorial dækker nogle grundlæggende oplysninger om at bruge en Pandas-serie som input til at kalde et REST API og gemme resultatet i AWS Redshift. Men hvis dataene skaleres, er det vigtigt at:

  1. Abonner på API med høj ydeevne og høj lydstyrke.
  2. Reducer behandlingen i hukommelsen ved at skrive til disk gentagne gange eller udføre parallel behandling ved hjælp af biblioteker som Dask.
  3. Brug Apache Spark eller andre lignende teknologier til at håndtere meget stor databehandling.
  4. Vælg databaseteknologien korrekt baseret på ydelseskrav.

Jeg håber, at denne tutorial har været noget nyttigt, hvis du har spørgsmål, så kontakt os via kommentarerne.