Source code for pybd

#!/usr/bin/env python

# Copyright 2019 John T. Foster
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import psycopg2
import numpy as np
import pandas as pd


if "BAZEAN_POSTGRES_USERNAME" in os.environ:
    bazean_postgres_username = os.environ["BAZEAN_POSTGRES_USERNAME"]
else:
    bazean_postgres_username = None

if "BAZEAN_POSTGRES_PASSWORD" in os.environ:
    bazean_postgres_password = os.environ["BAZEAN_POSTGRES_PASSWORD"]
else:
    bazean_postgres_password = None


[docs]class PyBD(object): """Class for querying Bazean database. Args: user (str): Bazean database username, defaults to use the environment variable `BAZEAN_POSTGRES_USERNAME` if assigned password (str): Bazean database password, defaults to use the environment variable `BAZEAN_POSTGRES_PASSWORD` if assigned subdomain (str): URL subdomain, default is "premium" schema (str): Postgres schema, default is "public" """ def __init__( self, user=bazean_postgres_username, password=bazean_postgres_password, subdomain="premium", schema="public", ): url = "postgresql://{}.bazean.com:5432/db?ssl=true".format(subdomain) # Create connection to database self.__connection = psycopg2.connect(user=user, password=password, dsn=url) self.__cursor = self.__connection.cursor() self.__cursor.execute("SET search_path TO {}".format(schema)) self.__default_fetch_size = 50 def __del__(self): """Destructor to close database connection.""" self.__connection.close() self.__cursor.close() def __fetch(self, raw_sql_string, number_of_records_to_fetch=None): """Fetches records from database Args: raw_sql_string (str): SQL query string number_of_records_to_fetch (int or str): limits the length of returned records Returns: (list): A Python list with the records returned by the sql query. """ self.__cursor.execute(raw_sql_string) if number_of_records_to_fetch is None: return list( map(list, self.__cursor.fetchmany(size=self.__default_fetch_size)) ) elif isinstance(number_of_records_to_fetch, int): return list( map(list, self.__cursor.fetchmany(size=number_of_records_to_fetch)) ) elif isinstance(number_of_records_to_fetch, str): if number_of_records_to_fetch == "all": return list(map(list, self.__cursor.fetchall())) def __build_query_string(self, table, columns, **kwargs): """Builds SQL query string from columns requested and table names Args: table (str): SQL table name columns (tuple of str): A tuple of column names to select **kwargs: Arbitrary number of keyword arguments of the form `key=value`. These would be expected to construct the `WHERE` portion of the SQL statement with a logical `AND` operation. For example: ```python db = PyBD() db._PyBD__build_query_string('production_all', ('apis',), state='KS', api='15001016610000') ``` Would result in the string: ```sql SELECT apis FROM production_all WHERE state='KS' AND api='15001016610000' ``` Returns: (str): The SQL query statement """ if columns != "*" and len(columns) > 1: columns = ",".join(columns) else: columns, = columns query_string = "SELECT {} FROM {}".format(columns, table) if kwargs is not None: for count, items in enumerate(kwargs.items()): key, value = items if count == 0: query_string += " WHERE {}='{}'".format(key, value) else: query_string += " AND {}='{}'".format(key, value) return query_string
[docs] def set_fetch_size(self, value): """Sets the fetch size for database query methods i.e. `get_` methods Args: value (int or `'all'`): fetch size, default is 50. """ self.__default_fetch_size = value
[docs] def get(self, table, columns, **kwargs): """General function to construct SQL query statement and retrieve columns of data Args: table (str): SQL table name columns (tuple of str): A tuple of column names to select **kwargs: Arbitrary number of keyword arguments of the form `key=value`. These would be expected to construct the `WHERE` portion of the SQL statement with a logical `AND` operation. For example:: db = PyBD() db.get('production_all', ('apis',), state='KS', api='15001016610000') and would result in the query string:: SELECT apis FROM production_all WHERE state='KS' AND api='15001016610000' In this simple case, the function would return a list `[['15001016610000']]`. Returns: (list): Nested list of returned columns of data from the SQL query. """ query_string = self.__build_query_string(table, columns, **kwargs) return self.__fetch( query_string, number_of_records_to_fetch=self.__default_fetch_size )
[docs] def get_tickers_by_state(self, state): """Returns the stock tickers (actual or assigned) of companies that operate/own wells in a given state. Args: state (str): The two letter postal code of a state, i.e. "TX" or "NM" Returns: (list of str): A list containing the stock tickers. Missing entries query entries with `None` are ommitted. """ db_tickers = self.get(table="well_all", columns=("parent_ticker",), state=state) unique_tickers = np.unique(np.array(db_tickers, dtype=np.str)) return unique_tickers[unique_tickers != "None"].tolist()
[docs] def get_well_locations_by_ticker_and_state(self, ticker, state): """Returns the latitude, longitude and API number of wells Args: ticker (str): Company stock ticker, i.e. "XOM" state (str): The two letter postal code of a state, i.e. "TX" or "NM" Returns: (DataFrame): Pandas DataFrame """ latitude, longitude, api = np.array( self.get( table="well_all", columns=("latitude_surface_hole", "longitude_surface_hole", "api"), parent_ticker=ticker, state=state, ) ).T bool_array = latitude != None lat_clean = latitude[bool_array].astype(dtype=np.float) long_clean = longitude[bool_array].astype(dtype=np.float) api_clean = api[bool_array].astype(dtype=np.str) # Convert to pandas dataframe df = { "latitude_surface_hole": lat_clean, "longitude_surface_hole": long_clean, "api": api_clean, } return pd.DataFrame(data=df)
[docs] def get_production_from_api(self, api): """Returns the total production histories for a given API number Args: api (str): API number for requested well production histories Returns: (DataFrame): Pandas DataFrame. """ default_fetch_size = self.__default_fetch_size self.set_fetch_size("all") request = np.array( self.get( table="production_all", columns=( "date", "volume_oil_formation_bbls", "volume_gas_formation_mcf", "volume_water_formation_bbls", ), api=api, ) ).T self.set_fetch_size(default_fetch_size) if request.size != 0: date, oil, gas, water = request # Convert to pandas dataframe df = { "volume_oil_formation_bbls": oil.astype("double"), "volume_gas_formation_mcf": gas.astype("double"), "volume_water_formation_bbls": water.astype("double"), } return pd.DataFrame(data=df, index=date.astype("datetime64")) else: return None