Search
Calendar
August 2025
S M T W T F S
« Jul    
 12
3456789
10111213141516
17181920212223
24252627282930
31  
Archives

Posts Tagged ‘Python’

PostHeaderIcon Demystifying Parquet: The Power of Efficient Data Storage in the Cloud

Unlocking the Power of Apache Parquet: A Modern Standard for Data Efficiency

In today’s digital ecosystem, where data volume, velocity, and variety continue to rise, the choice of file format can dramatically impact performance, scalability, and cost. Whether you are an architect designing a cloud-native data platform or a developer managing analytics pipelines, Apache Parquet stands out as a foundational technology you should understand — and probably already rely on.

This article explores what Parquet is, why it matters, and how to work with it in practice — including real examples in Python, Java, Node.js, and Bash for converting and uploading files to Amazon S3.

What Is Apache Parquet?

Apache Parquet is a high-performance, open-source file format designed for efficient columnar data storage. Originally developed by Twitter and Cloudera and now an Apache Software Foundation project, Parquet is purpose-built for use with distributed data processing frameworks like Apache Spark, Hive, Impala, and Drill.

Unlike row-based formats such as CSV or JSON, Parquet organizes data by columns rather than rows. This enables powerful compression, faster retrieval of selected fields, and dramatic performance improvements for analytical queries.

Why Choose Parquet?

✅ Columnar Format = Faster Queries

Because Parquet stores values from the same column together, analytical engines can skip irrelevant data and process only what’s required — reducing I/O and boosting speed.

Compression and Storage Efficiency

Parquet achieves better compression ratios than row-based formats, thanks to the similarity of values in each column. This translates directly into reduced cloud storage costs.

Schema Evolution

Parquet supports schema evolution, enabling your datasets to grow gracefully. New fields can be added over time without breaking existing consumers.

Interoperability

The format is compatible across multiple ecosystems and languages, including Python (Pandas, PyArrow), Java (Spark, Hadoop), and even browser-based analytics tools.

☁️ Using Parquet with Amazon S3

One of the most common modern use cases for Parquet is in conjunction with Amazon S3, where it powers data lakes, ETL pipelines, and serverless analytics via services like Amazon Athena and Redshift Spectrum.

Here’s how you can write Parquet files and upload them to S3 in different environments:

From CSV to Parquet in Practice

Python Example

import pandas as pd

# Load CSV data
df = pd.read_csv("input.csv")

# Save as Parquet
df.to_parquet("output.parquet", engine="pyarrow")

To upload to S3:

import boto3

s3 = boto3.client("s3")
s3.upload_file("output.parquet", "your-bucket", "data/output.parquet")

Node.js Example

Install the required libraries:

npm install aws-sdk

Upload file to S3:

const AWS = require('aws-sdk');
const fs = require('fs');

const s3 = new AWS.S3();
const fileContent = fs.readFileSync('output.parquet');

const params = {
    Bucket: 'your-bucket',
    Key: 'data/output.parquet',
    Body: fileContent
};

s3.upload(params, (err, data) => {
    if (err) throw err;
    console.log(`File uploaded successfully at ${data.Location}`);
});

☕ Java with Apache Spark and AWS SDK

In your pom.xml, include:

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.12.2</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-s3</artifactId>
    <version>1.12.470</version>
</dependency>

Spark conversion:

Dataset<Row> df = spark.read().option("header", "true").csv("input.csv");
df.write().parquet("output.parquet");

Upload to S3:

AmazonS3 s3 = AmazonS3ClientBuilder.standard()
    .withRegion("us-west-2")
    .withCredentials(new AWSStaticCredentialsProvider(
        new BasicAWSCredentials("ACCESS_KEY", "SECRET_KEY")))
    .build();

s3.putObject("your-bucket", "data/output.parquet", new File("output.parquet"));

Bash with AWS CLI

aws s3 cp output.parquet s3://your-bucket/data/output.parquet

Final Thoughts

Apache Parquet has quietly become a cornerstone of the modern data stack. It powers everything from ad hoc analytics to petabyte-scale data lakes, bringing consistency and efficiency to how we store and retrieve data.

Whether you are migrating legacy pipelines, designing new AI workloads, or simply optimizing your storage bills — understanding and adopting Parquet can unlock meaningful benefits.

When used in combination with cloud platforms like AWS, the performance, scalability, and cost-efficiency of Parquet-based workflows are hard to beat.


PostHeaderIcon Creating EPUBs from Images: A Developer’s Guide to Digital Publishing

Ever needed to convert a collection of images into a professional EPUB file? Whether you’re working with comics, manga, or any image-based content, I’ve developed a Python script that makes this process seamless and customizable.

What is create_epub.py?

This Python script transforms a folder of images into a fully-featured EPUB file, complete with:

  • Proper EPUB 3.0 structure
  • Customizable metadata
  • Table of contents
  • Responsive image display
  • Cover image handling

Key Features

  • Smart Filename Generation: Automatically generates EPUB filenames based on metadata (e.g., “MyBook_01_1.epub”)
  • Comprehensive Metadata Support: Title, author, series, volume, edition, ISBN, and more
  • Image Optimization: Supports JPEG, PNG, and GIF formats with proper scaling
  • Responsive Design: CSS-based layout that works across devices
  • Detailed Logging: Progress tracking and debugging capabilities

Usage Example

python create_epub.py image_folder \
    --title "My Book" \
    --author "Author Name" \
    --volume 1 \
    --edition "First Edition" \
    --series "My Series" \
    --publisher "My Publisher" \
    --isbn "978-3-16-148410-0"

Technical Details

The script creates a proper EPUB 3.0 structure with:

  • META-INF/container.xml
  • OEBPS/content.opf (metadata)
  • OEBPS/toc.ncx (table of contents)
  • OEBPS/nav.xhtml (navigation)
  • OEBPS/style.css (responsive styling)
  • OEBPS/images/ (image storage)

Best Practices Implemented

  • Proper XML namespaces and validation
  • Responsive image handling
  • Comprehensive metadata support
  • Clean, maintainable code structure
  • Extensive error handling and logging

Getting Started

# Install dependencies
pip install -r requirements.txt

# Basic usage
python create_epub.py /path/to/images --title "My Book"

# With debug logging
python create_epub.py /path/to/images --title "My Book" --debug

The script is designed to be both powerful and user-friendly, making it accessible to developers while providing the flexibility needed for professional publishing workflows.

Whether you’re a developer looking to automate EPUB creation or a content creator seeking to streamline your publishing process, this tool provides a robust solution for converting images into EPUB files.

The script on GitHub or below: 👇👇👇

import os
import sys
import logging
import zipfile
import uuid
from datetime import datetime
import argparse
from PIL import Image
import xml.etree.ElementTree
from xml.dom import minidom

# @author Jonathan Lalou / https://github.com/JonathanLalou/

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.StreamHandler(sys.stdout)
    ]
)
logger = logging.getLogger(__name__)

# Define the CSS content
CSS_CONTENT = '''
body {
    margin: 0;
    padding: 0;
    display: flex;
    justify-content: center;
    align-items: center;
    min-height: 100vh;
}
img {
    max-width: 100%;
    max-height: 100vh;
    object-fit: contain;
}
'''

def create_container_xml():
    """Create the container.xml file."""
    logger.debug("Creating container.xml")
    container = xml.etree.ElementTree.Element('container', {
        'version': '1.0',
        'xmlns': 'urn:oasis:names:tc:opendocument:xmlns:container'
    })
    rootfiles = xml.etree.ElementTree.SubElement(container, 'rootfiles')
    xml.etree.ElementTree.SubElement(rootfiles, 'rootfile', {
        'full-path': 'OEBPS/content.opf',
        'media-type': 'application/oebps-package+xml'
    })
    xml_content = prettify_xml(container)
    logger.debug("container.xml content:\n" + xml_content)
    return xml_content

def create_content_opf(metadata, spine_items, manifest_items):
    """Create the content.opf file."""
    logger.debug("Creating content.opf")
    logger.debug(f"Metadata: {metadata}")
    logger.debug(f"Spine items: {spine_items}")
    logger.debug(f"Manifest items: {manifest_items}")
    
    package = xml.etree.ElementTree.Element('package', {
        'xmlns': 'http://www.idpf.org/2007/opf',
        'xmlns:dc': 'http://purl.org/dc/elements/1.1/',
        'xmlns:dcterms': 'http://purl.org/dc/terms/',
        'xmlns:opf': 'http://www.idpf.org/2007/opf',
        'version': '3.0',
        'unique-identifier': 'bookid'
    })
    
    # Metadata
    metadata_elem = xml.etree.ElementTree.SubElement(package, 'metadata')
    
    # Required metadata
    book_id = str(uuid.uuid4())
    xml.etree.ElementTree.SubElement(metadata_elem, 'dc:identifier', {'id': 'bookid'}).text = book_id
    logger.debug(f"Generated book ID: {book_id}")
    
    xml.etree.ElementTree.SubElement(metadata_elem, 'dc:title').text = metadata.get('title', 'Untitled')
    xml.etree.ElementTree.SubElement(metadata_elem, 'dc:language').text = metadata.get('language', 'en')
    xml.etree.ElementTree.SubElement(metadata_elem, 'dc:creator').text = metadata.get('author', 'Unknown')
    
    # Add required dcterms:modified
    current_time = datetime.now().strftime('%Y-%m-%dT%H:%M:%SZ')
    xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
        'property': 'dcterms:modified'
    }).text = current_time
    
    # Add cover metadata
    xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
        'name': 'cover',
        'content': 'cover-image'
    })
    
    # Add additional metadata
    if metadata.get('publisher'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:publisher').text = metadata['publisher']
    
    if metadata.get('description'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:description').text = metadata['description']
    
    if metadata.get('rights'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:rights').text = metadata['rights']
    
    if metadata.get('subject'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:subject').text = metadata['subject']
    
    if metadata.get('isbn'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:identifier', {
            'opf:scheme': 'ISBN'
        }).text = metadata['isbn']
    
    # Series metadata
    if metadata.get('series'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
            'property': 'belongs-to-collection'
        }).text = metadata['series']
        xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
            'property': 'group-position'
        }).text = metadata.get('volume', '1')
    
    # Release date
    if metadata.get('release_date'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'dc:date').text = metadata['release_date']
    
    # Version and edition
    if metadata.get('version'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
            'property': 'schema:version'
        }).text = metadata['version']
    
    if metadata.get('edition'):
        xml.etree.ElementTree.SubElement(metadata_elem, 'meta', {
            'property': 'schema:bookEdition'
        }).text = metadata['edition']
    
    # Manifest
    manifest = xml.etree.ElementTree.SubElement(package, 'manifest')
    for item in manifest_items:
        xml.etree.ElementTree.SubElement(manifest, 'item', item)
    
    # Spine
    spine = xml.etree.ElementTree.SubElement(package, 'spine')
    for item in spine_items:
        xml.etree.ElementTree.SubElement(spine, 'itemref', {'idref': item})
    
    xml_content = prettify_xml(package)
    logger.debug("content.opf content:\n" + xml_content)
    return xml_content

def create_toc_ncx(metadata, nav_points):
    """Create the toc.ncx file."""
    logger.debug("Creating toc.ncx")
    logger.debug(f"Navigation points: {nav_points}")
    
    ncx = xml.etree.ElementTree.Element('ncx', {
        'xmlns': 'http://www.daisy.org/z3986/2005/ncx/',
        'version': '2005-1'
    })
    
    head = xml.etree.ElementTree.SubElement(ncx, 'head')
    book_id = str(uuid.uuid4())
    xml.etree.ElementTree.SubElement(head, 'meta', {'name': 'dtb:uid', 'content': book_id})
    logger.debug(f"Generated NCX book ID: {book_id}")
    
    xml.etree.ElementTree.SubElement(head, 'meta', {'name': 'dtb:depth', 'content': '1'})
    xml.etree.ElementTree.SubElement(head, 'meta', {'name': 'dtb:totalPageCount', 'content': '0'})
    xml.etree.ElementTree.SubElement(head, 'meta', {'name': 'dtb:maxPageNumber', 'content': '0'})
    
    doc_title = xml.etree.ElementTree.SubElement(ncx, 'docTitle')
    xml.etree.ElementTree.SubElement(doc_title, 'text').text = metadata.get('title', 'Untitled')
    
    nav_map = xml.etree.ElementTree.SubElement(ncx, 'navMap')
    for i, (id, label, src) in enumerate(nav_points, 1):
        nav_point = xml.etree.ElementTree.SubElement(nav_map, 'navPoint', {'id': id, 'playOrder': str(i)})
        nav_label = xml.etree.ElementTree.SubElement(nav_point, 'navLabel')
        xml.etree.ElementTree.SubElement(nav_label, 'text').text = label
        xml.etree.ElementTree.SubElement(nav_point, 'content', {'src': src})
    
    xml_content = prettify_xml(ncx)
    logger.debug("toc.ncx content:\n" + xml_content)
    return xml_content

def create_nav_xhtml(metadata, nav_points):
    """Create the nav.xhtml file."""
    logger.debug("Creating nav.xhtml")
    
    html = xml.etree.ElementTree.Element('html', {
        'xmlns': 'http://www.w3.org/1999/xhtml',
        'xmlns:epub': 'http://www.idpf.org/2007/ops'
    })
    
    head = xml.etree.ElementTree.SubElement(html, 'head')
    xml.etree.ElementTree.SubElement(head, 'title').text = 'Table of Contents'
    
    body = xml.etree.ElementTree.SubElement(html, 'body')
    nav = xml.etree.ElementTree.SubElement(body, 'nav', {'epub:type': 'toc'})
    ol = xml.etree.ElementTree.SubElement(nav, 'ol')
    
    for _, label, src in nav_points:
        li = xml.etree.ElementTree.SubElement(ol, 'li')
        xml.etree.ElementTree.SubElement(li, 'a', {'href': src}).text = label
    
    xml_content = prettify_xml(html)
    logger.debug("nav.xhtml content:\n" + xml_content)
    return xml_content

def create_page_xhtml(page_number, image_file):
    """Create an XHTML page for an image."""
    logger.debug(f"Creating page {page_number} for image {image_file}")
    
    html = xml.etree.ElementTree.Element('html', {
        'xmlns': 'http://www.w3.org/1999/xhtml',
        'xmlns:epub': 'http://www.idpf.org/2007/ops'
    })
    
    head = xml.etree.ElementTree.SubElement(html, 'head')
    xml.etree.ElementTree.SubElement(head, 'title').text = f'Page {page_number}'
    xml.etree.ElementTree.SubElement(head, 'link', {
        'rel': 'stylesheet',
        'type': 'text/css',
        'href': 'style.css'
    })
    
    body = xml.etree.ElementTree.SubElement(html, 'body')
    xml.etree.ElementTree.SubElement(body, 'img', {
        'src': f'images/{image_file}',
        'alt': f'Page {page_number}'
    })
    
    xml_content = prettify_xml(html)
    logger.debug(f"Page {page_number} XHTML content:\n" + xml_content)
    return xml_content

def prettify_xml(elem):
    """Convert XML element to pretty string."""
    rough_string = xml.etree.ElementTree.tostring(elem, 'utf-8')
    reparsed = minidom.parseString(rough_string)
    return reparsed.toprettyxml(indent="  ")

def create_epub_from_images(image_folder, output_file, metadata):
    logger.info(f"Starting EPUB creation from images in {image_folder}")
    logger.info(f"Output file will be: {output_file}")
    logger.info(f"Metadata: {metadata}")
    
    # Get all image files
    image_files = [f for f in os.listdir(image_folder) 
                  if f.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp'))]
    image_files.sort()
    logger.info(f"Found {len(image_files)} image files")
    logger.debug(f"Image files: {image_files}")
    
    if not image_files:
        logger.error("No image files found in the specified folder")
        sys.exit(1)
    
    # Create ZIP file (EPUB)
    logger.info("Creating EPUB file structure")
    with zipfile.ZipFile(output_file, 'w', zipfile.ZIP_DEFLATED) as epub:
        # Add mimetype (must be first, uncompressed)
        logger.debug("Adding mimetype file (uncompressed)")
        epub.writestr('mimetype', 'application/epub+zip', zipfile.ZIP_STORED)
        
        # Create META-INF directory
        logger.debug("Adding container.xml")
        epub.writestr('META-INF/container.xml', create_container_xml())
        
        # Create OEBPS directory structure
        logger.debug("Creating OEBPS directory structure")
        os.makedirs('temp/OEBPS/images', exist_ok=True)
        os.makedirs('temp/OEBPS/style', exist_ok=True)
        
        # Add CSS
        logger.debug("Adding style.css")
        epub.writestr('OEBPS/style.css', CSS_CONTENT)
        
        # Process images and create pages
        logger.info("Processing images and creating pages")
        manifest_items = [
            {'id': 'style', 'href': 'style.css', 'media-type': 'text/css'},
            {'id': 'nav', 'href': 'nav.xhtml', 'media-type': 'application/xhtml+xml', 'properties': 'nav'}
        ]
        spine_items = []
        nav_points = []
        
        for i, image_file in enumerate(image_files, 1):
            logger.debug(f"Processing image {i:03d}/{len(image_files):03d}: {image_file}")
            
            # Copy image to temp directory
            image_path = os.path.join(image_folder, image_file)
            logger.debug(f"Reading image: {image_path}")
            with open(image_path, 'rb') as f:
                image_data = f.read()
            logger.debug(f"Adding image to EPUB: OEBPS/images/{image_file}")
            epub.writestr(f'OEBPS/images/{image_file}', image_data)
            
            # Add image to manifest
            image_id = f'image_{i:03d}'
            if i == 1:
                image_id = 'cover-image'  # Special ID for cover image
            manifest_items.append({
                'id': image_id,
                'href': f'images/{image_file}',
                'media-type': 'image/jpeg' if image_file.lower().endswith(('.jpg', '.jpeg')) else 'image/png'
            })
            
            # Create page XHTML
            page_id = f'page_{i:03d}'
            logger.debug(f"Creating page XHTML: {page_id}.xhtml")
            page_content = create_page_xhtml(i, image_file)
            epub.writestr(f'OEBPS/{page_id}.xhtml', page_content)
            
            # Add to manifest and spine
            manifest_items.append({
                'id': page_id,
                'href': f'{page_id}.xhtml',
                'media-type': 'application/xhtml+xml'
            })
            spine_items.append(page_id)
            
            # Add to navigation points
            nav_points.append((
                f'navpoint-{i:03d}',
                'Cover' if i == 1 else f'Page {i:03d}',
                f'{page_id}.xhtml'
            ))
        
        # Create content.opf
        logger.debug("Creating content.opf")
        epub.writestr('OEBPS/content.opf', create_content_opf(metadata, spine_items, manifest_items))
        
        # Create toc.ncx
        logger.debug("Creating toc.ncx")
        epub.writestr('OEBPS/toc.ncx', create_toc_ncx(metadata, nav_points))
        
        # Create nav.xhtml
        logger.debug("Creating nav.xhtml")
        epub.writestr('OEBPS/nav.xhtml', create_nav_xhtml(metadata, nav_points))
    
    logger.info(f"Successfully created EPUB file: {output_file}")
    logger.info("EPUB structure:")
    logger.info("  mimetype")
    logger.info("  META-INF/container.xml")
    logger.info("  OEBPS/")
    logger.info("    content.opf")
    logger.info("    toc.ncx")
    logger.info("    nav.xhtml")
    logger.info("    style.css")
    logger.info("    images/")
    for i in range(1, len(image_files) + 1):
        logger.info(f"    page_{i:03d}.xhtml")

def generate_default_filename(metadata, image_folder):
    """Generate default EPUB filename based on metadata."""
    # Get title from metadata or use folder name
    title = metadata.get('title')
    if not title:
        # Get folder name and extract part before last underscore
        folder_name = os.path.basename(os.path.normpath(image_folder))
        title = folder_name.rsplit('_', 1)[0] if '_' in folder_name else folder_name
    
    # Format title: remove spaces, hyphens, quotes and capitalize
    title = ''.join(word.capitalize() for word in title.replace('-', ' ').replace('"', '').replace("'", '').split())
    
    # Format volume number with 2 digits
    volume = metadata.get('volume', '01')
    if volume.isdigit():
        volume = f"{int(volume):02d}"
    
    # Get edition number
    edition = metadata.get('edition', '1')
    
    return f"{title}_{volume}_{edition}.epub"

def main():
    parser = argparse.ArgumentParser(description='Create an EPUB from a folder of images')
    parser.add_argument('image_folder', help='Folder containing the images')
    parser.add_argument('--output-file', '-o', help='Output EPUB file path (optional)')
    parser.add_argument('--title', help='Book title')
    parser.add_argument('--author', help='Book author')
    parser.add_argument('--series', help='Series name')
    parser.add_argument('--volume', help='Volume number')
    parser.add_argument('--release-date', help='Release date (YYYY-MM-DD)')
    parser.add_argument('--edition', help='Edition number')
    parser.add_argument('--version', help='Version number')
    parser.add_argument('--language', help='Book language (default: en)')
    parser.add_argument('--publisher', help='Publisher name')
    parser.add_argument('--description', help='Book description')
    parser.add_argument('--rights', help='Copyright/license information')
    parser.add_argument('--subject', help='Book subject/category')
    parser.add_argument('--isbn', help='ISBN number')
    parser.add_argument('--debug', action='store_true', help='Enable debug logging')
    
    args = parser.parse_args()

    if args.debug:
        logger.setLevel(logging.DEBUG)
        logger.info("Debug logging enabled")

    if not os.path.exists(args.image_folder):
        logger.error(f"Image folder does not exist: {args.image_folder}")
        sys.exit(1)

    if not os.path.isdir(args.image_folder):
        logger.error(f"Specified path is not a directory: {args.image_folder}")
        sys.exit(1)

    metadata = {
        'title': args.title,
        'author': args.author,
        'series': args.series,
        'volume': args.volume,
        'release_date': args.release_date,
        'edition': args.edition,
        'version': args.version,
        'language': args.language,
        'publisher': args.publisher,
        'description': args.description,
        'rights': args.rights,
        'subject': args.subject,
        'isbn': args.isbn
    }

    # Remove None values from metadata
    metadata = {k: v for k, v in metadata.items() if v is not None}

    # Generate output filename if not provided
    if not args.output_file:
        args.output_file = generate_default_filename(metadata, args.image_folder)
        logger.info(f"Using default output filename: {args.output_file}")

    try:
        create_epub_from_images(args.image_folder, args.output_file, metadata)
        logger.info("EPUB creation completed successfully")
    except Exception as e:
        logger.error(f"EPUB creation failed: {str(e)}")
        sys.exit(1)

if __name__ == '__main__':
    main()
    

PostHeaderIcon Understanding Chi-Square Tests: A Comprehensive Guide for Developers

In the world of software development and data analysis, understanding statistical significance is crucial. Whether you’re running A/B tests, analyzing user behavior, or building machine learning models, the Chi-Square (χ²) test is an essential tool in your statistical toolkit. This comprehensive guide will help you understand its principles, implementation, and practical applications.

What is Chi-Square?

The Chi-Square test is a statistical method used to determine if there’s a significant difference between expected and observed frequencies in categorical data. It’s named after the Greek letter χ (chi) and is particularly useful for analyzing relationships between categorical variables.

Historical Context

The Chi-Square test was developed by Karl Pearson in 1900, making it one of the oldest statistical tests still in widespread use today. Its development marked a significant advancement in statistical analysis, particularly in the field of categorical data analysis.

Core Principles and Mathematical Foundation

  • Null Hypothesis (H₀): Assumes no significant difference between observed and expected data
  • Alternative Hypothesis (H₁): Suggests a significant difference exists
  • Degrees of Freedom: Number of categories minus constraints
  • P-value: Probability of observing the results if H₀ is true

The Chi-Square Formula

The Chi-Square statistic is calculated using the formula:

χ² = Σ [(O - E)² / E]

Where:
– O = Observed frequency
– E = Expected frequency
– Σ = Sum over all categories

Practical Implementation

1. A/B Testing Implementation (Python)

from scipy.stats import chi2_contingency
import numpy as np
import matplotlib.pyplot as plt

def perform_ab_test(control_data, treatment_data):
    """
    Perform A/B test using Chi-Square test
    
    Args:
        control_data: List of [successes, failures] for control group
        treatment_data: List of [successes, failures] for treatment group
    """
    # Create contingency table
    observed = np.array([control_data, treatment_data])
    
    # Perform Chi-Square test
    chi2, p_value, dof, expected = chi2_contingency(observed)
    
    # Calculate effect size (Cramer's V)
    n = np.sum(observed)
    min_dim = min(observed.shape) - 1
    cramers_v = np.sqrt(chi2 / (n * min_dim))
    
    return {
        'chi2': chi2,
        'p_value': p_value,
        'dof': dof,
        'expected': expected,
        'effect_size': cramers_v
    }

# Example usage
control = [100, 150]  # [clicks, no-clicks] for control
treatment = [120, 130]  # [clicks, no-clicks] for treatment

results = perform_ab_test(control, treatment)
print(f"Chi-Square: {results['chi2']:.2f}")
print(f"P-value: {results['p_value']:.4f}")
print(f"Effect Size (Cramer's V): {results['effect_size']:.3f}")

2. Feature Selection Implementation (Java)

import org.apache.commons.math3.stat.inference.ChiSquareTest;
import java.util.Arrays;

public class FeatureSelection {
    private final ChiSquareTest chiSquareTest;
    
    public FeatureSelection() {
        this.chiSquareTest = new ChiSquareTest();
    }
    
    public FeatureSelectionResult analyzeFeature(
            long[][] observed,
            double significanceLevel) {
        
        double pValue = chiSquareTest.chiSquareTest(observed);
        boolean isSignificant = pValue < significanceLevel;
        
        // Calculate effect size (Cramer's V)
        double chiSquare = chiSquareTest.chiSquare(observed);
        long total = Arrays.stream(observed)
                .flatMapToLong(Arrays::stream)
                .sum();
        int minDim = Math.min(observed.length, observed[0].length) - 1;
        double cramersV = Math.sqrt(chiSquare / (total * minDim));
        
        return new FeatureSelectionResult(
            pValue,
            isSignificant,
            cramersV
        );
    }
    
    public static class FeatureSelectionResult {
        private final double pValue;
        private final boolean isSignificant;
        private final double effectSize;
        
        // Constructor and getters
    }
}

Advanced Applications

1. Machine Learning Feature Selection

Chi-Square tests are particularly useful in feature selection for machine learning models. Here's how to implement it in Python using scikit-learn:

from sklearn.feature_selection import SelectKBest, chi2
from sklearn.datasets import load_iris
import pandas as pd

# Load dataset
iris = load_iris()
X = pd.DataFrame(iris.data, columns=iris.feature_names)
y = iris.target

# Select top 2 features using Chi-Square
selector = SelectKBest(chi2, k=2)
X_new = selector.fit_transform(X, y)

# Get selected features
selected_features = X.columns[selector.get_support()]
print(f"Selected features: {selected_features.tolist()}")

2. Goodness-of-Fit Testing

Testing if your data follows a particular distribution:

from scipy.stats import chisquare
import numpy as np

# Example: Testing if dice is fair
observed = np.array([18, 16, 15, 17, 16, 18])  # Observed frequencies
expected = np.array([16.67, 16.67, 16.67, 16.67, 16.67, 16.67])  # Expected for fair dice

chi2, p_value = chisquare(observed, expected)
print(f"Chi-Square: {chi2:.2f}")
print(f"P-value: {p_value:.4f}")

Best Practices and Considerations

  • Sample Size: Ensure sufficient sample size for reliable results
  • Expected Frequencies: Each expected frequency should be ≥ 5
  • Multiple Testing: Apply corrections (e.g., Bonferroni) when conducting multiple tests
  • Effect Size: Consider effect size in addition to p-values
  • Assumptions: Verify test assumptions before application

Common Pitfalls to Avoid

  • Using Chi-Square for continuous data
  • Ignoring small expected frequencies
  • Overlooking multiple testing issues
  • Focusing solely on p-values without considering effect size
  • Applying the test without checking assumptions

Resources and Further Reading

Understanding and properly implementing Chi-Square tests can significantly enhance your data analysis capabilities as a developer. Whether you're working on A/B testing, feature selection, or data validation, this statistical tool provides valuable insights into your data's relationships and distributions.

Remember to always consider the context of your analysis, verify assumptions, and interpret results carefully. Happy coding!

PostHeaderIcon RSS to EPUB Converter: Create eBooks from RSS Feeds

Overview

This Python script (rss_to_ebook.py) converts RSS or Atom feeds into EPUB format eBooks, allowing you to read your favorite blog posts and news articles offline in your preferred e-reader. The script intelligently handles both RSS 2.0 and Atom feed formats, preserving HTML formatting while creating a clean, readable eBook.

Key Features

  • Dual Format Support: Works with both RSS 2.0 and Atom feeds
  • Smart Pagination: Automatically handles paginated feeds using multiple detection methods
  • Date Range Filtering: Select specific date ranges for content inclusion
  • Metadata Preservation: Maintains feed metadata including title, author, and description
  • HTML Formatting: Preserves original HTML formatting while cleaning unnecessary elements
  • Duplicate Prevention: Automatically detects and removes duplicate entries
  • Comprehensive Logging: Detailed progress tracking and error reporting

Technical Details

The script uses several Python libraries:

  • feedparser: For parsing RSS and Atom feeds
  • ebooklib: For creating EPUB files
  • BeautifulSoup: For HTML cleaning and processing
  • logging: For detailed operation tracking

Usage

python rss_to_ebook.py <feed_url> [--start-date YYYY-MM-DD] [--end-date YYYY-MM-DD] [--output filename.epub] [--debug]

Parameters:

  • feed_url: URL of the RSS or Atom feed (required)
  • --start-date: Start date for content inclusion (default: 1 year ago)
  • --end-date: End date for content inclusion (default: today)
  • --output: Output EPUB filename (default: rss_feed.epub)
  • --debug: Enable detailed logging

Example

python rss_to_ebook.py https://example.com/feed --start-date 2024-01-01 --end-date 2024-03-31 --output my_blog.epub

Requirements

  • Python 3.x
  • Required packages (install via pip):
    pip install feedparser ebooklib beautifulsoup4

How It Works

  1. Feed Detection: Automatically identifies feed format (RSS 2.0 or Atom)
  2. Content Processing:
    • Extracts entries within specified date range
    • Preserves HTML formatting while cleaning unnecessary elements
    • Handles pagination to get all available content
  3. EPUB Creation:
    • Creates chapters from feed entries
    • Maintains original formatting and links
    • Includes table of contents and navigation
    • Preserves feed metadata

Error Handling

  • Validates feed format and content
  • Handles malformed HTML
  • Provides detailed error messages and logging
  • Gracefully handles missing or incomplete feed data

Use Cases

  • Create eBooks from your favorite blogs
  • Archive important news articles
  • Generate reading material for offline use
  • Create compilations of related content

Gist: GitHub

Here is the script:

#!/usr/bin/env python3

import feedparser
import argparse
from datetime import datetime, timedelta
from ebooklib import epub
import re
from bs4 import BeautifulSoup
import logging

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

def clean_html(html_content):
    """Clean HTML content while preserving formatting."""
    soup = BeautifulSoup(html_content, 'html.parser')
    
    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()
    
    # Remove any inline styles
    for tag in soup.find_all(True):
        if 'style' in tag.attrs:
            del tag.attrs['style']
    
    # Return the cleaned HTML
    return str(soup)

def get_next_feed_page(current_feed, feed_url):
    """Get the next page of the feed using various pagination methods."""
    # Method 1: next_page link in feed
    if hasattr(current_feed, 'next_page'):
        logging.info(f"Found next_page link: {current_feed.next_page}")
        return current_feed.next_page
    
    # Method 2: Atom-style pagination
    if hasattr(current_feed.feed, 'links'):
        for link in current_feed.feed.links:
            if link.get('rel') == 'next':
                logging.info(f"Found Atom-style next link: {link.href}")
                return link.href
    
    # Method 3: RSS 2.0 pagination (using lastBuildDate)
    if hasattr(current_feed.feed, 'lastBuildDate'):
        last_date = current_feed.feed.lastBuildDate
        if hasattr(current_feed.entries, 'last'):
            last_entry = current_feed.entries[-1]
            if hasattr(last_entry, 'published_parsed'):
                last_entry_date = datetime(*last_entry.published_parsed[:6])
                # Try to construct next page URL with date parameter
                if '?' in feed_url:
                    next_url = f"{feed_url}&before={last_entry_date.strftime('%Y-%m-%d')}"
                else:
                    next_url = f"{feed_url}?before={last_entry_date.strftime('%Y-%m-%d')}"
                logging.info(f"Constructed date-based next URL: {next_url}")
                return next_url
    
    # Method 4: Check for pagination in feed description
    if hasattr(current_feed.feed, 'description'):
        desc = current_feed.feed.description
        # Look for common pagination patterns in description
        next_page_patterns = [
            r'next page: (https?://\S+)',
            r'older posts: (https?://\S+)',
            r'page \d+: (https?://\S+)'
        ]
        for pattern in next_page_patterns:
            match = re.search(pattern, desc, re.IGNORECASE)
            if match:
                next_url = match.group(1)
                logging.info(f"Found next page URL in description: {next_url}")
                return next_url
    
    return None

def get_feed_type(feed):
    """Determine if the feed is RSS 2.0 or Atom format."""
    if hasattr(feed, 'version') and feed.version.startswith('rss'):
        return 'rss'
    elif hasattr(feed, 'version') and feed.version == 'atom10':
        return 'atom'
    # Try to detect by checking for Atom-specific elements
    elif hasattr(feed.feed, 'links') and any(link.get('rel') == 'self' for link in feed.feed.links):
        return 'atom'
    # Default to RSS if no clear indicators
    return 'rss'

def get_entry_content(entry, feed_type):
    """Get the content of an entry based on feed type."""
    if feed_type == 'atom':
        # Atom format
        if hasattr(entry, 'content'):
            return entry.content[0].value if entry.content else ''
        elif hasattr(entry, 'summary'):
            return entry.summary
    else:
        # RSS 2.0 format
        if hasattr(entry, 'content'):
            return entry.content[0].value if entry.content else ''
        elif hasattr(entry, 'description'):
            return entry.description
    return ''

def get_entry_date(entry, feed_type):
    """Get the publication date of an entry based on feed type."""
    if feed_type == 'atom':
        # Atom format uses updated or published
        if hasattr(entry, 'published_parsed'):
            return datetime(*entry.published_parsed[:6])
        elif hasattr(entry, 'updated_parsed'):
            return datetime(*entry.updated_parsed[:6])
    else:
        # RSS 2.0 format uses pubDate
        if hasattr(entry, 'published_parsed'):
            return datetime(*entry.published_parsed[:6])
    return datetime.now()

def get_feed_metadata(feed, feed_type):
    """Extract metadata from feed based on its type."""
    metadata = {
        'title': '',
        'description': '',
        'language': 'en',
        'author': 'Unknown',
        'publisher': '',
        'rights': '',
        'updated': ''
    }
    
    if feed_type == 'atom':
        # Atom format metadata
        metadata['title'] = feed.feed.get('title', '')
        metadata['description'] = feed.feed.get('subtitle', '')
        metadata['language'] = feed.feed.get('language', 'en')
        metadata['author'] = feed.feed.get('author', 'Unknown')
        metadata['rights'] = feed.feed.get('rights', '')
        metadata['updated'] = feed.feed.get('updated', '')
    else:
        # RSS 2.0 format metadata
        metadata['title'] = feed.feed.get('title', '')
        metadata['description'] = feed.feed.get('description', '')
        metadata['language'] = feed.feed.get('language', 'en')
        metadata['author'] = feed.feed.get('author', 'Unknown')
        metadata['copyright'] = feed.feed.get('copyright', '')
        metadata['lastBuildDate'] = feed.feed.get('lastBuildDate', '')
    
    return metadata

def create_ebook(feed_url, start_date, end_date, output_file):
    """Create an ebook from RSS feed entries within the specified date range."""
    logging.info(f"Starting ebook creation from feed: {feed_url}")
    logging.info(f"Date range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
    
    # Parse the RSS feed
    feed = feedparser.parse(feed_url)
    
    if feed.bozo:
        logging.error(f"Error parsing feed: {feed.bozo_exception}")
        return False
    
    # Determine feed type
    feed_type = get_feed_type(feed)
    logging.info(f"Detected feed type: {feed_type}")
    
    logging.info(f"Successfully parsed feed: {feed.feed.get('title', 'Unknown Feed')}")
    
    # Create a new EPUB book
    book = epub.EpubBook()
    
    # Extract metadata based on feed type
    metadata = get_feed_metadata(feed, feed_type)
    
    logging.info(f"Setting metadata for ebook: {metadata['title']}")
    
    # Set basic metadata
    book.set_identifier(feed_url)  # Use feed URL as unique identifier
    book.set_title(metadata['title'])
    book.set_language(metadata['language'])
    book.add_author(metadata['author'])
    
    # Add additional metadata if available
    if metadata['description']:
        book.add_metadata('DC', 'description', metadata['description'])
    if metadata['publisher']:
        book.add_metadata('DC', 'publisher', metadata['publisher'])
    if metadata['rights']:
        book.add_metadata('DC', 'rights', metadata['rights'])
    if metadata['updated']:
        book.add_metadata('DC', 'date', metadata['updated'])
    
    # Add date range to description
    date_range_desc = f"Content from {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}"
    book.add_metadata('DC', 'description', f"{metadata['description']}\n\n{date_range_desc}")
    
    # Create table of contents
    chapters = []
    toc = []
    
    # Process entries within date range
    entries_processed = 0
    entries_in_range = 0
    consecutive_out_of_range = 0
    current_page = 1
    processed_urls = set()  # Track processed URLs to avoid duplicates
    
    logging.info("Starting to process feed entries...")
    
    while True:
        logging.info(f"Processing page {current_page} with {len(feed.entries)} entries")
        
        # Process current batch of entries
        for entry in feed.entries[entries_processed:]:
            entries_processed += 1
            
            # Skip if we've already processed this entry
            entry_id = entry.get('id', entry.get('link', ''))
            if entry_id in processed_urls:
                logging.debug(f"Skipping duplicate entry: {entry_id}")
                continue
            processed_urls.add(entry_id)
            
            # Get entry date based on feed type
            entry_date = get_entry_date(entry, feed_type)
            
            if entry_date < start_date:
                consecutive_out_of_range += 1
                logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (before start date)")
                continue
            elif entry_date > end_date:
                consecutive_out_of_range += 1
                logging.debug(f"Skipping entry from {entry_date.strftime('%Y-%m-%d')} (after end date)")
                continue
            else:
                consecutive_out_of_range = 0
                entries_in_range += 1
                
                # Create chapter
                title = entry.get('title', 'Untitled')
                logging.info(f"Adding chapter: {title} ({entry_date.strftime('%Y-%m-%d')})")
                
                # Get content based on feed type
                content = get_entry_content(entry, feed_type)
                
                # Clean the content
                cleaned_content = clean_html(content)
                
                # Create chapter
                chapter = epub.EpubHtml(
                    title=title,
                    file_name=f'chapter_{len(chapters)}.xhtml',
                    content=f'<h1>{title}</h1>{cleaned_content}'
                )
                
                # Add chapter to book
                book.add_item(chapter)
                chapters.append(chapter)
                toc.append(epub.Link(chapter.file_name, title, chapter.id))
        
        # If we have no entries in range or we've seen too many consecutive out-of-range entries, stop
        if entries_in_range == 0 or consecutive_out_of_range >= 10:
            if entries_in_range == 0:
                logging.warning("No entries found within the specified date range")
            else:
                logging.info(f"Stopping after {consecutive_out_of_range} consecutive out-of-range entries")
            break
            
        # Try to get more entries if available
        next_page_url = get_next_feed_page(feed, feed_url)
        if next_page_url:
            current_page += 1
            logging.info(f"Fetching next page: {next_page_url}")
            feed = feedparser.parse(next_page_url)
            if not feed.entries:
                logging.info("No more entries available")
                break
        else:
            logging.info("No more pages available")
            break
    
    if entries_in_range == 0:
        logging.error("No entries found within the specified date range")
        return False
    
    logging.info(f"Processed {entries_processed} total entries, {entries_in_range} within date range")
    
    # Add table of contents
    book.toc = toc
    
    # Add navigation files
    book.add_item(epub.EpubNcx())
    book.add_item(epub.EpubNav())
    
    # Define CSS style
    style = '''
    @namespace epub "http://www.idpf.org/2007/ops";
    body {
        font-family: Cambria, Liberation Serif, serif;
    }
    h1 {
        text-align: left;
        text-transform: uppercase;
        font-weight: 200;
    }
    '''
    
    # Add CSS file
    nav_css = epub.EpubItem(
        uid="style_nav",
        file_name="style/nav.css",
        media_type="text/css",
        content=style
    )
    book.add_item(nav_css)
    
    # Create spine
    book.spine = ['nav'] + chapters
    
    # Write the EPUB file
    logging.info(f"Writing EPUB file: {output_file}")
    epub.write_epub(output_file, book, {})
    logging.info("EPUB file created successfully")
    return True

def main():
    parser = argparse.ArgumentParser(description='Convert RSS feed to EPUB ebook')
    parser.add_argument('feed_url', help='URL of the RSS feed')
    parser.add_argument('--start-date', help='Start date (YYYY-MM-DD)', 
                        default=(datetime.now() - timedelta(days=365)).strftime('%Y-%m-%d'))
    parser.add_argument('--end-date', help='End date (YYYY-MM-DD)',
                        default=datetime.now().strftime('%Y-%m-%d'))
    parser.add_argument('--output', help='Output EPUB file name',
                        default='rss_feed.epub')
    parser.add_argument('--debug', action='store_true', help='Enable debug logging')
    
    args = parser.parse_args()
    
    if args.debug:
        logging.getLogger().setLevel(logging.DEBUG)
    
    # Parse dates
    start_date = datetime.strptime(args.start_date, '%Y-%m-%d')
    end_date = datetime.strptime(args.end_date, '%Y-%m-%d')
    
    # Create ebook
    if create_ebook(args.feed_url, start_date, end_date, args.output):
        logging.info(f"Successfully created ebook: {args.output}")
    else:
        logging.error("Failed to create ebook")

if __name__ == '__main__':
    main()

PostHeaderIcon Quick and dirty script to convert WordPress export file to Blogger / Atom XML

I’ve created a Python script that converts WordPress export files to Blogger/Atom XML format. Here’s how to use it:

The script takes two command-line arguments:

  • wordpress_export.xml: Path to your WordPress export XML file
  • blogger_export.xml : Path where you want to save the converted Blogger/Atom XML file

To run the script:

python wordpress_to_blogger.py wordpress_export.xml blogger_export.xml

The script performs the following conversions:

  • Converts WordPress posts to Atom feed entries
  • Preserves post titles, content, publication dates, and authors
  • Maintains categories as Atom categories
  • Handles post status (published/draft)
  • Preserves HTML content formatting
  • Converts dates to ISO format required by Atom

The script uses Python’s built-in xml.etree.ElementTree module for XML processing and includes error handling to make it robust.
Some important notes:

  • The script only converts posts (not pages or other content types)
  • It preserves the HTML content of your posts
  • It maintains the original publication dates
  • It handles both published and draft posts
  • The output is a valid Atom XML feed that Blogger can import

The file:

#!/usr/bin/env python3
import xml.etree.ElementTree as ET
import sys
import argparse
from datetime import datetime
import re

def convert_wordpress_to_blogger(wordpress_file, output_file):
# Parse WordPress XML
tree = ET.parse(wordpress_file)
root = tree.getroot()

# Create Atom feed
atom = ET.Element('feed', {
'xmlns': 'http://www.w3.org/2005/Atom',
'xmlns:app': 'http://www.w3.org/2007/app',
'xmlns:thr': 'http://purl.org/syndication/thread/1.0'
})

# Add feed metadata
title = ET.SubElement(atom, 'title')
title.text = 'Blog Posts'

updated = ET.SubElement(atom, 'updated')
updated.text = datetime.now().isoformat()

# Process each post
for item in root.findall('.//item'):
if item.find('wp:post_type', {'wp': 'http://wordpress.org/export/1.2/'}).text != 'post':
continue

entry = ET.SubElement(atom, 'entry')

# Title
title = ET.SubElement(entry, 'title')
title.text = item.find('title').text

# Content
content = ET.SubElement(entry, 'content', {'type': 'html'})
content.text = item.find('content:encoded', {'content': 'http://purl.org/rss/1.0/modules/content/'}).text

# Publication date
pub_date = item.find('pubDate').text
published = ET.SubElement(entry, 'published')
published.text = datetime.strptime(pub_date, '%a, %d %b %Y %H:%M:%S %z').isoformat()

# Author
author = ET.SubElement(entry, 'author')
name = ET.SubElement(author, 'name')
name.text = item.find('dc:creator', {'dc': 'http://purl.org/dc/elements/1.1/'}).text

# Categories
for category in item.findall('category'):
category_elem = ET.SubElement(entry, 'category', {'term': category.text})

# Status
status = item.find('wp:status', {'wp': 'http://wordpress.org/export/1.2/'}).text
if status == 'publish':
app_control = ET.SubElement(entry, 'app:control', {'xmlns:app': 'http://www.w3.org/2007/app'})
app_draft = ET.SubElement(app_control, 'app:draft')
app_draft.text = 'no'
else:
app_control = ET.SubElement(entry, 'app:control', {'xmlns:app': 'http://www.w3.org/2007/app'})
app_draft = ET.SubElement(app_control, 'app:draft')
app_draft.text = 'yes'

# Write the output file
tree = ET.ElementTree(atom)
tree.write(output_file, encoding='utf-8', xml_declaration=True)

def main():
parser = argparse.ArgumentParser(description='Convert WordPress export to Blogger/Atom XML format')
parser.add_argument('wordpress_file', help='Path to WordPress export XML file')
parser.add_argument('output_file', help='Path to output Blogger/Atom XML file')

args = parser.parse_args()

try:
convert_wordpress_to_blogger(args.wordpress_file, args.output_file)
print(f"Successfully converted {args.wordpress_file} to {args.output_file}")
except Exception as e:
print(f"Error: {str(e)}")
sys.exit(1)

if __name__ == '__main__':
main()