Adapt a customized data loading and data sampling pipeline to hf datasets

Hi community,
I’m working on a customized data loading and sampling pipeline, which entangles many different operations together, which is not quite modular and I want to reimplement it with hf datasets.
The pipeline involves several operations:

  1. data loading:
    the whole corpus is a collection of several different image-text datasets, where the images are stored in lmdb format and the text labels are stored in json. The json file has the following format:

    [ "448_128": [
       {
           "file_name": "hiertext_005abacb38bd4cba_34.jpg",
           "label": "Today I'm gonna make a comic!<|sn|>Webcomics are so popular, that<|sn|>must be a sure way to fame and<|sn|>fortune!",
           "type": "para"
       },...],...]
    

    where items are grouped by the corresponding image’s resolution (448_128). And we can extract the image from the lmdb by the file_name. The resolution grouping is to arrange similar resolution in one batch during training, reducing the waste of paddings.

    Besides, each dataset also has a property is_doc, represents whether it comes from document or scene and this will determine which data augmentation pipeline is applied afterwards.

    So here, the first question is how to convert this data corpus into a HF dataset?
    My ideas are:
    1. we may use DatasetDict to manage the whole collection.
    2. for each dataset in the collection, we extract the image with the file_name.
    3. for the group, I think it better to de-group it first and create a resolution column, then regroup it during the training if necessary.
    4. In the end, we will have a HF dataset with columns: image label type resolution.
    The above may need to be adapted according to the data sampling I’m going to tell in the following.

  2. data sampling:
    The data sampling is delicate, which involves many tricks:

    1. The training runs for 10 epochs, different datasets in the collection are sampled with different frequencies, some are sampled 1x per epoch, some are sampled 5x per epoch and some are sampled with a pre-defined split, i.e. at epoch 0 they are sampled from label_split_0_epoch.json and so on.
    2. As mentioned above, the sampling would be grouped by the resolution, too.
    3. Sharding for different devices when using DDP training.

    The second question is can we achieve such sampling strategy fully or partially with HF dataset? i.e. the sampling ratio/frequency, the grouping. (I think the manually-sharding is not needed as we use the memory-mapped Arrow format.)
    Another question is the pre-defined split sampling truely necessary or common in practice? I think it would just make my sampler more complex.

1 Like

It seems like there might be a lot of tricky spots to achieve this


1) Converting the corpus into :hugs: Datasets (recommended approach)

What to store in Arrow vs keep external

A datasets.Dataset is an Arrow table, which gives fast reads and convenient column operations, but it doesn’t require you to store the heavy bytes inside Arrow. In fact, the “Image” feature typically expects paths or embedded bytes, which would either force you to (a) extract images to files or (b) duplicate LMDB bytes into Arrow. (Hugging Face)

Recommended: store metadata only in Arrow and keep LMDB as the image store. At training time, decode by file_name using a transform.

Shape of the dataset(s)

You can represent your multi-source corpus in either of these two equivalent ways:

  • Option A (common): DatasetDict with one Dataset per source
  • Option B: one merged Dataset with a source column

Option A is usually nicer when each source has different policies (sampling freq/caps/epoch splits), because you can keep per-source stats and debug easily.

Flatten the JSON grouped by "W_H"

Your idea to de-group and create explicit columns is correct. Suggested columns (superset of what you proposed):

  • file_name (LMDB key)
  • label, type
  • width, height (parsed from "W_H")
  • resolution (string, optional)
  • bucket_id (int; exact (W,H) or binned)
  • is_doc (per-source constant, but store it per-row so transforms can branch)
  • source (string)

Why not store image as a Datasets “Image” feature here?

HF’s Image feature decodes from paths or stored bytes, and decode=False can return {path, bytes} instead of PIL.Image. This is useful when your data is already file-based or already stored as bytes in Arrow, but it doesn’t directly match “bytes live in LMDB keyed by file_name” without copying. (Hugging Face)

Training-time decode/augment via set_transform / with_transform

HF explicitly supports on-the-fly transforms for images (set_transform) and recommends using map() for one-time preprocessing and set_transform for per-epoch augmentations. (Hugging Face)

So the “HF datasets” part becomes:

  1. offline: build metadata datasets and save_to_disk()

  2. training: load_from_disk(), then attach a transform that:

    • reads LMDB bytes by file_name
    • applies doc/scene augmentation based on is_doc
    • runs your image processor and returns tensors + passthrough metadata

2) Can HF Datasets implement your sampling strategy?

What HF can do directly (partial)

HF provides interleave_datasets() with:

  • probabilities (stochastic mixing)
  • stopping strategies like first_exhausted and oversampling modes like all_exhausted (Hugging Face)

This is helpful for rough mixing, but it does not naturally express:

  • “exactly 10 epochs with explicit epoch semantics”
  • “source A is exactly 1× per epoch and source B is exactly 5× per epoch”
  • “bucket by resolution and form batches within buckets”
  • “DDP-safe, deterministic, disjoint work per rank”

What you should do for strict semantics (recommended)

Use HF Datasets for indexable metadata storage and PyTorch for policy:

  • HF Dataset: metadata table, deterministic indexing

  • Training transform: LMDB decode + augmentation (set_transform / with_transform) (Hugging Face)

  • Custom PyTorch BatchSampler: the control plane for:

    • per-source quotas per epoch (1×, 5×, caps)
    • optional epoch-specific subset selection
    • bucketing (bucket_id) → batches
    • deterministic shuffles via set_epoch(epoch)
    • DDP partitioning at batch level

This division matches what HF is best at (dataset storage + column ops) and what PyTorch is best at (sampling/batching control).


3) DDP sharding: memory-mapped Arrow is not sharding

Arrow memory mapping improves I/O and avoids loading everything into RAM, but it doesn’t prevent two ranks from reading the same indices. You still need an explicit sharding mechanism (sampler or dataset sharding).

HF provides Dataset.shard(num_shards, index) as a deterministic way to split a dataset into N pieces. (Hugging Face)
HF’s Trainer/Accelerate will do distributed sharding for you, but if you’re building a custom loop you must ensure your own per-rank split. (Hugging Face Forums)

Separately, with PyTorch’s DistributedSampler, you must call sampler.set_epoch(epoch) each epoch to get different shuffles across epochs; otherwise, you can repeat the same order. (PyTorch Forums)

Practical recommendation for your case: partition at the batch level (global deterministic batch schedule, then rank takes every world_size-th batch). This makes “disjoint work + identical step counts” easy to reason about.


4) Resolution grouping (“bucketing”) with HF + PyTorch

HF itself doesn’t have a built-in “bucketed batch sampler” for your exact needs. The clean approach is:

  1. store bucket_id as a metadata column
  2. in your sampler, build bucket_id -> indices per source
  3. shuffle within bucket per epoch
  4. emit batches that are bucket-pure (or near-pure if you allow spillover)

HF docs explicitly note that if you want efficient batching and transforms, using a BatchSampler is the right tool on the PyTorch side. (Hugging Face)


5) Are predefined per-epoch split files necessary/common?

They’re not “standard” unless you’re doing something like:

  • curriculum learning (difficulty schedule)
  • strict replay / auditability requirements
  • staged inclusion (e.g., progressively adding noisy sources)
  • hard constraints from upstream labeling processes

If you don’t have one of those reasons, they mainly add complexity.

Simpler alternative: add a deterministic split_id per sample (e.g., hash of file_name) and define epoch subsets as rules like “epoch 0 uses split_id in [0..k)”, epoch 1 uses [k..2k), etc. This avoids managing 10 external JSON lists and keeps the logic local and reproducible.


Summary recommendation

  • Convert each source to a metadata-only HF Dataset (file_name,label,type,is_doc,width,height,bucket_id,source,...) and save_to_disk.
  • Use set_transform / with_transform to decode LMDB + augment + image_processor on the fly. (Hugging Face)
  • Implement strict mixing (1×/5×), epoch semantics, bucketing, and DDP disjointness in a custom PyTorch BatchSampler, not in interleave_datasets. (Hugging Face)
  • Do not rely on Arrow memory mapping for sharding; use Dataset.shard/DistributedSampler/batch-level partitioning. (Hugging Face)
1 Like

Wow, thank you John, I’ll go check the other parts carefully.
Let me first ask this: I flatten the json and extract the image from the lmdb, saving them to HF datasets.

"""
Convert UniRec-40M corpus to HuggingFace datasets (Arrow format).
Reads images from LMDB and labels from JSON, outputs image-label pairs.
"""

import json
import lmdb
from PIL import Image
from datasets import Dataset, Features, Image as HFImage, Value
from pathlib import Path
import argparse
from tqdm import tqdm

def load_label_json(json_path):
    """Load and flatten label JSON file."""
    all_items = []
    empty_count = 0
    total_count = 0
    for file in Path(json_path).glob('label*.json'):
        print(f"Loading labels from {file}...")
        with open(file, 'r') as f:
            json_data = json.load(f)
        
        # The label is stored in json file, with format:
        # ["width_height": [ { "file_name": ..., "label": ...}, ... ], ...]
        # so we need to first flatten the dimension-grouped structure 
        # and add a new field 'width_height'
        for width_height, items in json_data.items():
            for item in items:
                total_count += 1
                if item["label"] == "":
                    empty_count += 1
                    continue
                width, height = width_height.split('_')
                item['width_height'] = (int(width), int(height))
                all_items.append(item)
    print(f"{total_count} items, skipped {empty_count} empty labels")

    return all_items

def convert_lmdb_to_hf_dataset(lmdb_path, json_path, output_path):
    """
    Convert LMDB database + JSON labels to HuggingFace dataset.
    
    Args:
        lmdb_path: Path to LMDB database containing images
        json_path: Path to JSON file containing labels
        output_path: Path to save the HuggingFace dataset
    """
    print(f"Loading labels from {json_path}...")
    label_items = load_label_json(json_path)
    print(f"Found {len(label_items)} items")
    
    keys = label_items[0].keys() # {'file_name', 'label', 'width_height', 'type'}
    
    # Create HuggingFace dataset
    print("Creating HuggingFace dataset...")
    features = Features({
        'image': Value('binary'),
        **{key: Value('string') for key in keys}
    })
    print("features:", features)
    
    def data_generator():
        """Generator to yield items one at a time without loading all into memory."""
        with lmdb.open(
            lmdb_path,
            max_readers=32,
            readonly=True,
            lock=False,
            readahead=False,
            meminit=False
        ) as env:
            with env.begin() as txn:
                for item in tqdm(label_items):
                    if isinstance(item["label"], list):
                        item["label"] = '\n\n'.join(item["label"])

                    file_name = item['file_name']
                    image = txn.get(file_name.encode('utf-8'))

                    if image is None:
                        print(f"Warning: Image {file_name} not found in LMDB.")
                        continue

                    yield {
                        'image': image,
                        **{key: item[key] for key in keys}
                    }
    
    print("Processing images...")
    dataset = Dataset.from_generator(data_generator, features=features)
    
    print(f"Successfully processed {len(dataset)} items")
    
    # Save dataset
    print(f"Saving dataset to {output_path}...")
    
    dataset.save_to_disk(output_path)
    print(f"Dataset saved successfully!")
    print(f"Dataset info: {dataset}")


def main():
    parser = argparse.ArgumentParser(
        description='Convert LMDB database to HuggingFace dataset'
    )
    parser.add_argument('--lmdb_path',type=str,required=True,help='Path to LMDB database directory')
    parser.add_argument('--json_path',type=str,required=True,help='Path to label JSON file')
    parser.add_argument('--output_path',type=str,required=True,help='Output path for HuggingFace dataset')
    args = parser.parse_args()
    
    # Verify paths exist
    if not Path(args.lmdb_path).exists():
        raise FileNotFoundError(f"LMDB path not found: {args.lmdb_path}")
    if not Path(args.json_path).exists():
        raise FileNotFoundError(f"JSON path not found: {args.json_path}")

    if Path(args.output_path).exists():
        print(f"dataset already exists at {args.output_path}, skip.")
        print("-"*100)
        return
    Path(args.output_path).parent.mkdir(parents=True, exist_ok=True)
    print(f"build dataset at {args.output_path}")
    convert_lmdb_to_hf_dataset(args.lmdb_path, args.json_path, args.output_path)
    print("-"*100)


if __name__ == '__main__':
    main()

And I find two things surprising:

  1. The HF dataset only occupies 2.37GB (image+text), while the original dataset occupies (LMDB 11GB + json 140MB), what a huge compression rate! I know we save a lot of space from the field’s name in json files by converting them to columns, but I don’t know the LMDB one, shouldn’t it be a compressive format? (BTW, you may notice during the conversion I skip the items with empty labels, but for this very dataset, no item is skipped, so the reduction should not be due to the skip.)
  2. The HF dataset is split into 6 .arrow files, but they are not equally-divided in size (289M, 268M,266M,267M,325MB,1015.16MB) nor in samples (see below)
// dataset_info.json
"size_in_bytes": 2542646183,
  "splits": {
    "train": {
      "name": "train",
      "num_bytes": 2542646183,
      "num_examples": 1059606,
      "shard_lengths": [
        316600,
        301500,
        276100,
        73100,
        77200,
        15106
      ],
      "dataset_name": "generator"
    }
  }, 

So are these phenomena expected? Particularly if the compression holds, it worths to switch from those heavy LMDBs to HF datasets.

1 Like

Hmm… So complicated behavior…


What you observed is plausible, but it usually isn’t “magical compression”

You have two separate effects:

  1. LMDB on-disk size can be much larger than the live payload (free pages / page-level overhead / history).
  2. HF save_to_disk() shards by approximate bytes, not by equal row counts, and shard sizing can overshoot in real cases.

Below is the detailed “why” and how to verify.


1) Why can Arrow (save_to_disk) be far smaller than the LMDB directory?

1.1 LMDB size often includes “dead space” that doesn’t go away

LMDB files can accumulate unused pages (e.g., after deletes, rewrites, or ingestion patterns) and the file doesn’t automatically shrink; copying/compacting is the typical way to reclaim space. This is why “copying an LMDB makes it smaller” is a known/expected behavior. (Stack Overflow)

What this means for your numbers

  • Your Arrow dataset is ~2.54GB for 1,059,606 examples → ~2.4KB/example total.
  • Your LMDB directory is ~11GB → ~10KB/example.
    A ~7–9GB gap can be explained by free pages + page overhead if your “live bytes” are closer to the Arrow size.

1.2 LMDB storage overhead can be significant for many small values

LMDB stores data in B+tree pages (page granularity matters). If many values are relatively small, page-level packing/overflow behavior can increase file size versus raw bytes.

1.3 Arrow “Image/binary in Arrow” is not necessarily compressed

Arrow IPC supports buffer compression (LZ4/ZSTD), but it’s optional and controlled by writer options. If compression is None, buffers are uncompressed. (Apache Arrow)

HF Datasets doesn’t present save_to_disk() as a general “compression feature”, and there’s an open question from users about what the canonical way to compress a dataset is—suggesting compression is not something you should assume is happening automatically. (GitHub)

So the default explanation for “11GB → 2.37GB” is usually: LMDB had lots of non-live space, not that Arrow magically compressed JPEGs.


2) Why do you get multiple .arrow shards with uneven sizes and uneven sample counts?

2.1 save_to_disk() shards by max shard size (default 500MB)

HF documents max_shard_size as default "500MB". (Hugging Face)
So multiple Arrow files are expected.

2.2 Uneven example counts per shard is expected with variable-sized rows

Your dataset rows include image bytes; image byte lengths can vary dramatically by resolution and content. If sharding aims for a size cap, shards will contain very different numbers of examples.

Your shard_lengths show exactly that: early shards have hundreds of thousands of examples, the last shard has ~15k examples but ~1GB. That pattern typically means the tail of your iteration had much larger image payloads.

2.3 “max_shard_size” is not always strictly enforced

There are known situations where shard size overshoots the requested max (especially with large/variable rows), and HF has issues discussing unreliable shard sizing. (GitHub)
There are also related issues around large rows and shard size assumptions. (GitHub)


3) How to tell if the Arrow dataset truly stores the same image bytes (and where the size gap comes from)

3.1 Compare hashes of image bytes (LMDB vs Arrow)

Pick N random samples and compare sha256(lmdb_bytes) vs sha256(dataset_bytes).

If hashes match, Arrow is storing the same bytes; size gap is from LMDB overhead/free space.

3.2 Measure “live bytes” in LMDB

Use LMDB stats to estimate used pages and compare to file size.

Typical quick checks (Python):

  • env.stat()["psize"] and env.info()["last_pgno"] allow estimating file pages in use.
  • Compare with the OS-reported LMDB file size.

Also, do a compaction copy (“vacuum”) and compare the new LMDB file size; it often drops substantially when free space existed. The “copy shrinks LMDB” effect is widely observed. (Stack Overflow)


4) If the space savings hold, should you replace LMDB with Arrow?

When it can be a good idea

  • You want a single portable artifact for training (Arrow shards + metadata), easy to copy and memory-map.
  • Your LMDBs are large mainly due to free pages / historical growth, and you don’t want to manage compaction.

When to be cautious

  • If your images are already JPEG/WebP, “compression” may not be real; you might just be removing LMDB overhead.
  • If you rely on LMDB’s key-value access patterns and tuned behavior for random reads, Arrow may behave differently (often fine, but measure throughput).

Practical advice: decide based on measured training throughput (images/sec) with your actual num_workers, augmentation, and storage backend.


5) Two concrete improvements to your conversion code (quality-of-life)

5.1 Use the HF Image feature instead of Value("binary") (optional but useful)

HF explicitly documents that the Image feature can load PIL images from bytes. (Hugging Face)
That makes downstream transforms simpler (you get decoded images via feature decoding, when enabled).

5.2 Make width and height explicit columns

Instead of storing width_height as a tuple coerced into a string, store:

  • width: int32, height: int32, plus a derived bucket_id.

That makes bucketing and filtering more efficient and avoids silent dtype conversions.


6) Why your shard imbalance is so extreme (what to check)

Given you have one shard with ~15k samples but ~1GB, check:

  • whether the iterator order clusters very large resolutions at the end (JSON dict order, file order)
  • whether some subset contains significantly larger encoded images (e.g., higher resolution, different format/quality)

If you want more even shards:

  • shuffle the flattened list before writing, or
  • write per-resolution groups separately (but keep metadata for bucketing).

Key takeaway

  • Yes, multi-shard Arrow with uneven shard sizes/counts is expected because save_to_disk() shards by size (default 500MB) and rows are variable-sized. (Hugging Face)
  • Yes, “Arrow much smaller than LMDB” can be expected if your LMDB contains lots of free pages/overhead; compact/copy LMDB and compare to confirm. (Stack Overflow)
  • Don’t assume Arrow IPC compression is responsible unless you explicitly enable it; Arrow buffer compression is optional, and defaults can be uncompressed. (Apache Arrow)
1 Like


Indeed, when ask GPT about this, it told me the theoretical memory ls -lh and physical memory du -h is not same. So from 6G to 2.7G, makes more sense to me.

1 Like