Load Multiple Json Zip File From Gcs To Bigquery Using Dataflow Pipeline (python)
I am completely new to Dataflow and naïve programmer. I am looking for help in designing a dataflow pipeline written in python to read multi parted compressed Json files stored on
Solution 1:
Beam Python's fileio transforms have what you need to read zipped JSON. You can specify a compression type and file suffix. The tutorial on File Processing will also be helpful.
Solution 2:
Here is a sample Python Beam executable code and sample raw data.
#------------Import Lib-----------------------#import apache_beam as beam
from apache_beam import window
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import os, sys, time
import argparse
import logging
from apache_beam.options.pipeline_options import SetupOptions
from datetime import datetime
#------------Set up BQ parameters-----------------------## Replace with Project Id
project = 'xxxxxxxxxxx'input='gs://FILE-Path'#plitting Of Records----------------------#classTransaction_ECOM(beam.DoFn):
defprocess(self, element):
logging.info(element)
result = json.loads(element)
data_bkt = result.get('_bkt','null')
data_cd=result.get('_cd','null')
data_indextime=result.get('_indextime','0')
data_kv=result.get('_kv','null')
data_raw=result['_raw']
data_raw1=data_raw.replace("\n", "")
data_serial=result.get('_serial','null')
data_si = str(result.get('_si','null'))
data_sourcetype =result.get('_sourcetype','null')
data_subsecond = result.get('_subsecond','null')
data_time=result.get('_time','null')
data_host=result.get('host','null')
data_index=result.get('index','null')
data_linecount=result.get('linecount','null')
data_source=result.get('source','null')
data_sourcetype1=result.get('sourcetype','null')
data_splunk_server=result.get('splunk_server','null')
return [{"datetime_indextime": time.strftime('%Y-%m-%dT%H:%M:%S', time.localtime(int(data_indextime))), "_bkt": data_bkt, "_cd": data_cd, "_indextime": data_indextime, "_kv": data_kv, "_raw": data_raw1, "_serial": data_serial, "_si": data_si, "_sourcetype": data_sourcetype, "_subsecond": data_subsecond, "_time": data_time, "host": data_host, "index": data_index, "linecount": data_linecount, "source": data_source, "sourcetype": data_sourcetype1, "splunk_server": data_splunk_server}]
defrun(argv=None, save_main_session=True):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p1 = beam.Pipeline(options=pipeline_options)
data_loading = (
p1
|'Read from File' >> beam.io.ReadFromText(input,skip_header_lines=0)
)
project_id = "xxxxxxxxxxx"
dataset_id = 'test123'
table_schema_ECOM = ('datetime_indextime:DATETIME, _bkt:STRING, _cd:STRING, _indextime:STRING, _kv:STRING, _raw:STRING, _serial:STRING, _si:STRING, _sourcetype:STRING, _subsecond:STRING, _time:STRING, host:STRING, index:STRING, linecount:STRING, source:STRING, sourcetype:STRING, splunk_server:STRING')
# Persist to BigQuery# WriteToBigQuery accepts the data as list of JSON objects#---------------------Index = ITF----------------------------------------------------------------------------------------------------------------------
result = (
data_loading
| 'Clean-ITF' >> beam.ParDo(Transaction_ECOM())
| 'Write-ITF' >> beam.io.WriteToBigQuery(
table='CFF_ABC',
dataset=dataset_id,
project=project_id,
schema=table_schema_ECOM,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
))
result = p1.run()
result.wait_until_finish()
if __name__ == '__main__':
path_service_account = '/home/vibhg/Splunk/CFF/xxxxxxxxxxx-abcder125.json'
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = path_service_account
run()
It has few additional libraries so just ignore it.
Sample data which can be stored on GCS, that is given below:-
{"_bkt": "A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "itf~412~2EE5428B-7CEA-4C49-A1E8-A5370FECA146", "_cd": "412:140787687", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:59,126 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsSalesOrderCreated\", Locality=\"NA\", Success=\"True\", BsExecutionTime=\"00:00:00.005\", OrderNo=\"374941817\", Locality=\"NA\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsSalesOrderCreated\"], [userId=\"s-oitp-u-global\"], [userIdRegion=\"NA\"], [msgId=\"aaaaaaaaaaaaaaaaaaabbbbbbbbbbbbbbcccc\"], [msgIdSeq=\"2\"], [originator=\"ISOM\"] ", "_serial": "0", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".126", "_time": "2021-01-25 14:28:59.126 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
{"_bkt": "9-A1E8-A5370FECA146", "_cd": "412:140787671", "_indextime": "1611584940", "_kv": "1", "_raw": "2021-01-25 14:28:58,659 INFO [com.abcd.mfs.builder.builders.BsLogEntryBuilder] [-] LogEntryType=\"BsCall\", fulName=\"EBCMFSSALES02\", BusinessServiceName=\"BsCreateOrderV2\", BsExecutionTime=\"00:00:01.568\", OrderNo=\"374942155\", CountryCode=\"US\", ClientSystem=\"owfe-webapp\" , [fulName=\"EBCMFSSALES02\"], [bsName=\"BsCreateOrderV2\"], [userId=\"s-salja1-u-irssemal\"], [userIdRegion=\"NA\"], [msgId=\"6652311fece28966\"], [msgIdSeq=\"25\"], [originator=\"SellingApi\"] ", "_serial": "1", "_si": ["9ttr-bfc-gcp-europe-besti1", "itf"], "_sourcetype": "BBClog", "_subsecond": ".659", "_time": "2021-01-25 14:28:58.659 UTC", "host": "shampo-lx4821.abcd.com", "index": "itf", "linecount": "1", "source": "/opt/VRE/WebSphere/lickserv/profiles/appsrv01/logs/na-ebtree02_srv/log4j2.log", "sourcetype": "BBClog", "web_server": "9ttr-bfc-gcp-europe-besti1"}
You can execute script with following command :-
python script.py--region europe-west1 --project xxxxxxx --temp_location gs://temp/temp --runner DataflowRunner --job_name name
It may help you.
Post a Comment for "Load Multiple Json Zip File From Gcs To Bigquery Using Dataflow Pipeline (python)"