es-import - Import bulk-compatible JSONL format into Elasticsearch index

#!/usr/bin/env python3
DESCRIPTION="""
Import bulk-compatible JSONL format into Elasticsearch index.
"""
ARGS= [
    {"name": "--host", "type": str, "default": "localhost", "help": "Elasticsearch host URL (default: localhost)"},
    {"name": "--port", "type": str, "default": "9200", "help": "Elasticsearch port (default: 9200)"},
    {"name": "file_name", "type": str, "help": "Name of the JSONL file to import"}
]
AUTHOR="mjnurse.github.io - 2026"

HELP_LINE="Import bulk-compatible JSONL format into Elasticsearch index"
WEB_DESC_LINE="Import bulk-compatible JSONL format into Elasticsearch index"

import argparse
import json
import sys
from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk

def parse_args():
    parser = argparse.ArgumentParser(
        description="description:" + DESCRIPTION.replace("\n", "\n  ")[:-2],
        epilog="author:\n  " + AUTHOR,
        formatter_class=argparse.RawDescriptionHelpFormatter)
    for arg in ARGS:
        names = [arg["name"]] + ([arg["name2"]] if "name2" in arg else [])
        kwargs = {k: v for k, v in arg.items() if k not in ("name", "name2") and v is not None}
        parser.add_argument(*names, **kwargs)
    return parser.parse_args()

def import_jsonl(jsonl_path, es_host, es_port):
    es = Elasticsearch(f"http://{es_host}:{es_port}")

    try:
        es.info()
    except Exception as e:
        print(f"ERROR: Could not connect to Elasticsearch at {es_host}:{es_port}.\n\nException: {e}")
        sys.exit(1)

    if not jsonl_path.endswith(".jsonl"):
        jsonl_path += ".jsonl"

    def generate_actions():
        with open(jsonl_path, "r", encoding="utf-8") as f:
            while True:
                meta_line = f.readline()
                doc_line = f.readline()
                if not meta_line or not doc_line:
                    break
                meta = json.loads(meta_line)
                doc = json.loads(doc_line)
                action = {
                    "_op_type": "index",
                    "_index": meta["index"]["_index"],
                    "_id": meta["index"].get("_id"),
                    "_source": doc
                }
                yield action

    success, failed = 0, 0
    for ok, result in streaming_bulk(es, generate_actions(), raise_on_error=False):
        if ok:
            success += 1
        else:
            failed += 1

    print(f"Imported: {success} documents")
    if failed:
        print(f"WARNING: Failed: {failed} documents")

def main():
    args = parse_args()

    try:
        import_jsonl(args.file_name, args.host, args.port)
    except Exception as e:
        print(f"ERROR: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()