#!/usr/bin/env python3
DESCRIPTION="""
Import bulk-compatible JSONL format into Elasticsearch index.
"""
ARGS= [
{"name": "--host", "type": str, "default": "localhost", "help": "Elasticsearch host URL (default: localhost)"},
{"name": "--port", "type": str, "default": "9200", "help": "Elasticsearch port (default: 9200)"},
{"name": "file_name", "type": str, "help": "Name of the JSONL file to import"}
]
AUTHOR="mjnurse.github.io - 2026"
HELP_LINE="Import bulk-compatible JSONL format into Elasticsearch index"
WEB_DESC_LINE="Import bulk-compatible JSONL format into Elasticsearch index"
import argparse
import json
import sys
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
def parse_args():
parser = argparse.ArgumentParser(
description="description:" + DESCRIPTION.replace("\n", "\n ")[:-2],
epilog="author:\n " + AUTHOR,
formatter_class=argparse.RawDescriptionHelpFormatter)
for arg in ARGS:
names = [arg["name"]] + ([arg["name2"]] if "name2" in arg else [])
kwargs = {k: v for k, v in arg.items() if k not in ("name", "name2") and v is not None}
parser.add_argument(*names, **kwargs)
return parser.parse_args()
def import_jsonl(jsonl_path, es_host, es_port):
es = Elasticsearch(f"http://{es_host}:{es_port}")
try:
es.info()
except Exception as e:
print(f"ERROR: Could not connect to Elasticsearch at {es_host}:{es_port}.\n\nException: {e}")
sys.exit(1)
if not jsonl_path.endswith(".jsonl"):
jsonl_path += ".jsonl"
def generate_actions():
with open(jsonl_path, "r", encoding="utf-8") as f:
while True:
meta_line = f.readline()
doc_line = f.readline()
if not meta_line or not doc_line:
break
meta = json.loads(meta_line)
doc = json.loads(doc_line)
action = {
"_op_type": "index",
"_index": meta["index"]["_index"],
"_id": meta["index"].get("_id"),
"_source": doc
}
yield action
success, failed = 0, 0
for ok, result in streaming_bulk(es, generate_actions(), raise_on_error=False):
if ok:
success += 1
else:
failed += 1
print(f"Imported: {success} documents")
if failed:
print(f"WARNING: Failed: {failed} documents")
def main():
args = parse_args()
try:
import_jsonl(args.file_name, args.host, args.port)
except Exception as e:
print(f"ERROR: {e}")
sys.exit(1)
if __name__ == "__main__":
main()