package io.intino.ness.datalake.hadoop;

import io.intino.alexandria.logger.Logger;
import io.intino.alexandria.zet.Zet;
import io.intino.alexandria.zet.ZetReader;
import io.intino.alexandria.zet.ZetStream;
import io.intino.ness.datalake.hadoop.sessions.SetSessionReader;
import io.intino.ness.ingestion.Fingerprint;
import io.intino.ness.ingestion.Session;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;

/* loaded from: input_file:io/intino/ness/datalake/hadoop/SessionSealer.class */
public class SessionSealer {
    private final FileSystem fs;
    private final HadoopStage stage;
    private final Path events;
    private final Path sets;
    private final Path temp;

    /* loaded from: input_file:io/intino/ness/datalake/hadoop/SessionSealer$BlobReducer.class */
    public static class BlobReducer extends Reducer<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void setup(Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
            super.setup(context);
        }

        protected void reduce(Text text, Iterable<BytesWritable> iterable, Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
            new SequenceFile.Reader(context.getConfiguration(), new SequenceFile.Reader.Option[]{SequenceFile.Reader.file(fileOf(context.getConfiguration(), new Fingerprint(text.toString()), new Path(context.getCacheFiles()[0])))});
            iterable.iterator().next().getBytes();
        }

        private Path fileOf(Configuration configuration, Fingerprint fingerprint, Path path) {
            Path path2 = new Path(path, fingerprint.tank() + "/" + fingerprint.timetag() + ".seq");
            try {
                FileSystem.get(configuration).mkdirs(path2);
            } catch (IOException e) {
                Logger.error(e);
            }
            return path2;
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<BytesWritable>) iterable, (Reducer<Text, BytesWritable, Text, BytesWritable>.Context) context);
        }
    }

    /* loaded from: input_file:io/intino/ness/datalake/hadoop/SessionSealer$ChunksCombiner.class */
    public static class ChunksCombiner extends Reducer<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void reduce(Text text, Iterable<BytesWritable> iterable, Reducer<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
            ZetStream zetReader = new ZetReader(new long[0]);
            for (BytesWritable bytesWritable : iterable) {
                zetReader = new ZetStream.Merge(new ZetStream[]{zetReader, new ZetReader(ByteBuffer.allocate(8).put(bytesWritable.getBytes(), 0, bytesWritable.getLength()).asLongBuffer().array())});
            }
            ByteBuffer allocate = ByteBuffer.allocate(8);
            for (long j : new Zet(zetReader).ids()) {
                allocate.putLong(j);
            }
            this.zet.set(allocate.array(), 0, allocate.array().length);
            context.write(this.fingerprint, this.zet);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<BytesWritable>) iterable, (Reducer<Text, BytesWritable, Text, BytesWritable>.Context) context);
        }
    }

    /* loaded from: input_file:io/intino/ness/datalake/hadoop/SessionSealer$SessionMapper.class */
    public static class SessionMapper extends Mapper<Text, BytesWritable, Text, BytesWritable> {
        private final BytesWritable zet = new BytesWritable();
        private Text fingerprint = new Text();

        protected void map(Text text, BytesWritable bytesWritable, Mapper<Text, BytesWritable, Text, BytesWritable>.Context context) throws IOException, InterruptedException {
            if (text.toString().startsWith(Session.Type.event.name())) {
                mapEventBlob(bytesWritable.getBytes());
                return;
            }
            Map<String, byte[]> mapSetSessionsToZetStreams = mapSetSessionsToZetStreams(bytesWritable.getBytes());
            for (String str : mapSetSessionsToZetStreams.keySet()) {
                this.fingerprint.set(str);
                byte[] bArr = mapSetSessionsToZetStreams.get(str);
                this.zet.set(bArr, 0, bArr.length);
                context.write(this.fingerprint, this.zet);
            }
        }

        private Map<String, byte[]> mapSetSessionsToZetStreams(byte[] bArr) {
            try {
                SetSessionReader setSessionReader = new SetSessionReader(bArr);
                HashMap hashMap = new HashMap();
                for (Fingerprint fingerprint : setSessionReader.fingerprints()) {
                    hashMap.put(fingerprint.toString(), merge(setSessionReader.streamsOf(fingerprint)));
                }
                return hashMap;
            } catch (IOException e) {
                Logger.error(e);
                return Collections.emptyMap();
            }
        }

        private void mapEventBlob(byte[] bArr) {
        }

        private byte[] merge(List<ZetStream> list) {
            ByteBuffer allocate = ByteBuffer.allocate(8);
            for (long j : new Zet(new ZetStream.Merge(list)).ids()) {
                allocate.putLong(j);
            }
            return allocate.array();
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((Text) obj, (BytesWritable) obj2, (Mapper<Text, BytesWritable, Text, BytesWritable>.Context) context);
        }
    }

    public SessionSealer(FileSystem fileSystem, HadoopStage hadoopStage, Path path, Path path2, Path path3) {
        this.fs = fileSystem;
        this.stage = hadoopStage;
        this.events = path;
        this.sets = path2;
        this.temp = path3;
    }

    public void seal() {
        try {
            Job job = Job.getInstance();
            job.setJarByClass(getClass());
            job.setJobName("Sealer");
            FileInputFormat.addInputPath(job, this.stage.path());
            job.setInputFormatClass(SequenceFileInputFormat.class);
            FileOutputFormat.setOutputPath(job, this.sets);
            job.setMapperClass(SessionMapper.class);
            job.setCombinerClass(ChunksCombiner.class);
            job.setReducerClass(BlobReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(BytesWritable.class);
            job.setOutputFormatClass(SequenceFileOutputFormat.class);
            job.addCacheFile(this.sets.toUri());
            job.addCacheFile(this.events.toUri());
            job.setNumReduceTasks(2);
            job.waitForCompletion(true);
            if (job.isSuccessful()) {
                System.out.println("Job was successful");
            } else if (!job.isSuccessful()) {
                System.out.println("Job was not successful");
            }
        } catch (IOException | ClassNotFoundException | InterruptedException e) {
            Logger.error(e);
        }
    }
}
